448156 Fixed INACTIVE race in IteratingCallback

Simplified the ICB statemachine with the addition of a lock state to handle the case of iterate() called while a
previous process() iteration is just existing.

Also added the CALLED state to replace INACTIVE when a callback is called before PENDING state entered.

renamed INACTIVE to IDLE
This commit is contained in:
Greg Wilkins 2014-10-22 14:04:32 +11:00
parent 592265c21f
commit 34782005b2
1 changed files with 203 additions and 146 deletions

View File

@ -56,38 +56,47 @@ public abstract class IteratingCallback implements Callback
private enum State
{
/**
* This callback is inactive, ready to iterate.
* This callback is IDLE, ready to iterate.
*/
INACTIVE,
IDLE,
/**
* This callback is iterating and {@link #process()} has scheduled an
* asynchronous operation by returning {@link Action#SCHEDULED}, but
* the operation is still undergoing.
* This callback is iterating calls to {@link #process()} and is dealing with
* the returns. To get into processing state, it much of held the lock state
* and set iterating to true.
*/
ACTIVE,
PROCESSING,
/**
* This callback is iterating and {@link #process()} has been called
* but not returned yet.
* Waiting for a schedule callback
*/
ITERATING,
PENDING,
/**
* While this callback was iterating, another request for iteration
* has been issued, so the iteration must continue even if a previous
* call to {@link #process()} returned {@link Action#IDLE}.
* Called by a schedule callback
*/
ITERATE_AGAIN,
CALLED,
/**
* The overall job has succeeded.
* The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return
* from {@link IteratingCallback#process()}
*/
SUCCEEDED,
/**
* The overall job has failed.
* The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)}
*/
FAILED,
/**
* This callback has been closed and cannot be reset.
*/
CLOSED
CLOSED,
/**
* State is locked while leaving processing state to check the iterate boolean
*/
LOCKED
}
/**
@ -108,6 +117,7 @@ public abstract class IteratingCallback implements Callback
* may have not yet been invoked.
*/
SCHEDULED,
/**
* Indicates that {@link #process()} has completed the overall job.
*/
@ -115,15 +125,17 @@ public abstract class IteratingCallback implements Callback
}
private final AtomicReference<State> _state;
private boolean _iterate;
protected IteratingCallback()
{
_state = new AtomicReference<>(State.INACTIVE);
_state = new AtomicReference<>(State.IDLE);
}
protected IteratingCallback(boolean needReset)
{
_state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.INACTIVE);
_state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE);
}
/**
@ -172,126 +184,179 @@ public abstract class IteratingCallback implements Callback
/**
* This method must be invoked by applications to start the processing
* of sub tasks.
* <p/>
* If {@link #process()} returns {@link Action#IDLE}, then this method
* should be called again to restart processing.
* It is safe to call iterate multiple times from multiple threads since only
* the first thread to move the state out of INACTIVE will actually do any iteration
* and processing.
* of sub tasks. It can be called at any time by any thread, and it's
* contract is that when called, then the {@link #process()} method will
* be called during or soon after, either by the calling thread or by
* another thread.
*/
public void iterate()
{
try
loop: while (true)
{
while (true)
State state=_state.get();
switch (state)
{
switch (_state.get())
{
case INACTIVE:
{
if (processIterations())
return;
break;
}
case ITERATING:
{
if (_state.compareAndSet(State.ITERATING, State.ITERATE_AGAIN))
return;
break;
}
default:
{
return;
}
}
case PENDING:
case CALLED:
// process will be called when callback is handled
break loop;
case IDLE:
if (!_state.compareAndSet(State.IDLE,State.PROCESSING))
continue;
processing();
break loop;
case PROCESSING:
if (!_state.compareAndSet(State.PROCESSING,State.LOCKED))
continue;
// Tell the thread that is processing that it must iterate again
_iterate=true;
_state.set(State.PROCESSING);
break loop;
case LOCKED:
Thread.yield();
continue;
default:
throw new IllegalStateException("state="+state);
}
}
catch (Throwable x)
{
failed(x);
}
}
private boolean processIterations() throws Exception
private void processing()
{
// Keeps iterating as long as succeeded() is called during process().
// If we are in INACTIVE state, either this is the first iteration or
// succeeded()/failed() were called already.
while (_state.compareAndSet(State.INACTIVE, State.ITERATING))
// This should only ever be called when in processing state, however a failed or close call
// may happen concurrently, so state is not assumed.
// While we are processing
processing: while (true)
{
// Method process() can only be called by one thread at a time because
// it is guarded by the CaS above. However, the case blocks below may
// be executed concurrently in this case: T1 calls process() which
// executes the asynchronous sub task, which calls succeeded(), which
// moves the state into INACTIVE, then returns SCHEDULED; T2 calls
// iterate(), state is now INACTIVE and process() is called again and
// returns another action. Now we have 2 threads that may execute the
// action case blocks below concurrently; therefore each case block
// has to be prepared to fail the CaS it's doing.
Action action = process();
switch (action)
// Call process to get the action that we have to take.
Action action;
try
{
case IDLE:
{
// No more progress can be made.
if (_state.compareAndSet(State.ITERATING, State.INACTIVE))
return true;
action = process();
}
catch (Throwable x)
{
failed(x);
break processing;
}
// Was iterate() called again since we already decided to go INACTIVE ?
// If so, try another iteration as more work may have been added
// while the previous call to process() was returning.
if (_state.compareAndSet(State.ITERATE_AGAIN, State.INACTIVE))
continue;
// State may have changed concurrently, try again.
continue;
}
case SCHEDULED:
// loop until we have successfully acted on the action we have just received
acting: while(true)
{
// action handling needs to know the state
State state=_state.get();
switch (action)
{
// The sub task is executing, and the callback for it may or
// may not have already been called yet, which we figure out below.
// Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE.
if (_state.compareAndSet(State.ITERATING, State.ACTIVE) ||
_state.compareAndSet(State.ITERATE_AGAIN, State.ACTIVE))
// Not called back yet, so wait.
return true;
// Call back must have happened, so iterate.
continue;
}
case SUCCEEDED:
{
// The overall job has completed.
while (true)
case IDLE:
{
State current = _state.get();
switch(current)
// We need to change to idle state, but may stay in processing if there
// is a concurrent call to iterate()
switch (state)
{
case PROCESSING:
{
// lock the state
if (!_state.compareAndSet(State.PROCESSING,State.LOCKED))
continue acting;
// Has iterate been called while we were processing?
if (_iterate)
{
// yes, so skip idle and keep processing
_iterate=false;
_state.set(State.PROCESSING);
continue processing;
}
// No, so we can go idle
_state.set(State.IDLE);
break processing;
}
case LOCKED:
{
Thread.yield();
continue;
}
default:
throw new IllegalStateException("state="+state);
}
}
case SCHEDULED:
{
// the call to process has scheduled a callback to succeeded() or failed()
// these callbacks are in a race with us changing to pending state. If we win the
// race, then the callback have to keep processing, otherwise if we lose the race
// we have to keep processing.
switch(state)
{
case PROCESSING:
{
if (!_state.compareAndSet(State.PROCESSING, State.PENDING))
continue acting;
// we won the race, so the callback has to process and we can break processing
break processing;
}
case CALLED:
{
if (!_state.compareAndSet(State.CALLED, State.PROCESSING))
continue acting;
// we lost the race, so we have to keep processing
continue processing;
}
case LOCKED:
{
Thread.yield();
continue;
}
case FAILED:
case CLOSED:
{
break processing;
}
default:
throw new IllegalStateException("state="+state);
}
}
case SUCCEEDED:
{
// process() has told us that there is no more work to do, so we need
// to switch to SUCCEEDED state. We can now ignore any concurrent calls
// to iterate() until reset() is called.
switch(state)
{
case SUCCEEDED:
case FAILED:
// Already complete!.
return true;
case CLOSED:
throw new IllegalStateException();
default:
if (_state.compareAndSet(current, State.SUCCEEDED))
break processing;
case PROCESSING:
if (_state.compareAndSet(State.PROCESSING, State.SUCCEEDED))
{
onCompleteSuccess();
return true;
break processing;
}
default:
throw new IllegalStateException("state="+state);
}
}
}
default:
{
throw new IllegalStateException(toString());
default:
throw new IllegalStateException("action="+action);
}
}
}
return false;
}
}
}
/**
* Invoked when the sub task succeeds.
* Subclasses that override this method must always remember to call
@ -300,40 +365,32 @@ public abstract class IteratingCallback implements Callback
@Override
public void succeeded()
{
while (true)
loop: while (true)
{
State current = _state.get();
switch (current)
{
case ITERATE_AGAIN:
case ITERATING:
case PROCESSING:
{
if (_state.compareAndSet(current, State.INACTIVE))
return;
continue;
if (!_state.compareAndSet(State.PROCESSING, State.CALLED))
continue loop;
break loop;
}
case ACTIVE:
case PENDING:
{
// If we can move from ACTIVE to INACTIVE
// then we are responsible to call iterate().
if (_state.compareAndSet(current, State.INACTIVE))
iterate();
// If we can't CaS, then failed() must have been
// called, and we just return.
return;
}
case INACTIVE:
{
// Support the case where the callback is scheduled
// externally without a call to iterate().
iterate();
return;
if (!_state.compareAndSet(State.PENDING, State.PROCESSING))
continue loop;
processing();
break loop;
}
case CLOSED:
{
// Too late!
return;
break loop;
}
default:
{
throw new IllegalStateException(toString());
@ -352,12 +409,12 @@ public abstract class IteratingCallback implements Callback
{
while (true)
{
State current = _state.get();
switch (current)
State state = _state.get();
switch (state)
{
case SUCCEEDED:
case FAILED:
case INACTIVE:
case IDLE:
case CLOSED:
{
// Already complete!.
@ -365,7 +422,7 @@ public abstract class IteratingCallback implements Callback
}
default:
{
if (_state.compareAndSet(current, State.FAILED))
if (_state.compareAndSet(state, State.FAILED))
{
onCompleteFailure(x);
return;
@ -379,20 +436,20 @@ public abstract class IteratingCallback implements Callback
{
while (true)
{
State current = _state.get();
switch (current)
State state = _state.get();
switch (state)
{
case INACTIVE:
case IDLE:
case SUCCEEDED:
case FAILED:
{
if (_state.compareAndSet(current, State.CLOSED))
if (_state.compareAndSet(state, State.CLOSED))
return;
break;
}
default:
{
if (_state.compareAndSet(current, State.CLOSED))
if (_state.compareAndSet(state, State.CLOSED))
{
onCompleteFailure(new IllegalStateException("Closed with pending callback " + this));
return;
@ -408,7 +465,7 @@ public abstract class IteratingCallback implements Callback
*/
boolean isIdle()
{
return _state.get() == State.INACTIVE;
return _state.get() == State.IDLE;
}
public boolean isClosed()
@ -446,16 +503,16 @@ public abstract class IteratingCallback implements Callback
{
switch(_state.get())
{
case INACTIVE:
case IDLE:
return true;
case SUCCEEDED:
if (_state.compareAndSet(State.SUCCEEDED, State.INACTIVE))
if (_state.compareAndSet(State.SUCCEEDED, State.IDLE))
return true;
break;
case FAILED:
if (_state.compareAndSet(State.FAILED, State.INACTIVE))
if (_state.compareAndSet(State.FAILED, State.IDLE))
return true;
break;