448156 Fixed INACTIVE race in IteratingCallback

More code cleanups and review
This commit is contained in:
Greg Wilkins 2014-10-22 22:43:24 +11:00
parent 679b8d30f8
commit 921381eb7c
1 changed files with 75 additions and 84 deletions

View File

@ -203,13 +203,13 @@ public abstract class IteratingCallback implements Callback
break loop;
case IDLE:
if (!_state.compareAndSet(State.IDLE,State.PROCESSING))
if (!_state.compareAndSet(state,State.PROCESSING))
continue;
processing();
break loop;
case PROCESSING:
if (!_state.compareAndSet(State.PROCESSING,State.LOCKED))
if (!_state.compareAndSet(state,State.LOCKED))
continue;
// Tell the thread that is processing that it must iterate again
_iterate=true;
@ -218,8 +218,13 @@ public abstract class IteratingCallback implements Callback
case LOCKED:
Thread.yield();
continue;
continue loop;
case FAILED:
case SUCCEEDED:
break loop;
case CLOSED:
default:
throw new IllegalStateException("state="+state);
}
@ -251,18 +256,17 @@ public abstract class IteratingCallback implements Callback
{
// action handling needs to know the state
State state=_state.get();
switch (action)
switch (state)
{
case IDLE:
case PROCESSING:
{
// We need to change to idle state, but may stay in processing if there
// is a concurrent call to iterate()
switch (state)
switch (action)
{
case PROCESSING:
case IDLE:
{
// lock the state
if (!_state.compareAndSet(State.PROCESSING,State.LOCKED))
if (!_state.compareAndSet(state,State.LOCKED))
continue acting;
// Has iterate been called while we were processing?
@ -278,83 +282,62 @@ public abstract class IteratingCallback implements Callback
_state.set(State.IDLE);
break processing;
}
case LOCKED:
case SCHEDULED:
{
Thread.yield();
continue;
}
default:
throw new IllegalStateException("state="+state);
}
}
case SCHEDULED:
{
// the call to process has scheduled a callback to succeeded() or failed()
// these callbacks are in a race with us changing to pending state. If we win the
// race, then the callback have to keep processing, otherwise if we lose the race
// we have to keep processing.
switch(state)
{
case PROCESSING:
{
if (!_state.compareAndSet(State.PROCESSING, State.PENDING))
if (!_state.compareAndSet(state, State.PENDING))
continue acting;
// we won the race, so the callback has to process and we can break processing
// we won the race against the callback, so the callback has to process and we can break processing
break processing;
}
case CALLED:
case SUCCEEDED:
{
if (!_state.compareAndSet(State.CALLED, State.PROCESSING))
if (!_state.compareAndSet(state, State.LOCKED))
continue acting;
_iterate=false;
_state.set(State.SUCCEEDED);
onCompleteSuccess();
break processing;
}
default:
throw new IllegalStateException("state="+state+" action="+action);
}
}
case CALLED:
{
switch (action)
{
case SCHEDULED:
{
if (!_state.compareAndSet(state, State.PROCESSING))
continue acting;
// we lost the race, so we have to keep processing
continue processing;
}
case LOCKED:
{
Thread.yield();
continue;
}
case FAILED:
case CLOSED:
{
break processing;
}
default:
throw new IllegalStateException("state="+state);
throw new IllegalStateException("state="+state+" action="+action);
}
}
case LOCKED:
Thread.yield();
continue acting;
case SUCCEEDED:
{
// process() has told us that there is no more work to do, so we need
// to switch to SUCCEEDED state. We can now ignore any concurrent calls
// to iterate() until reset() is called.
switch(state)
{
case SUCCEEDED:
case FAILED:
// Already complete!.
break processing;
case PROCESSING:
if (!_state.compareAndSet(State.PROCESSING, State.SUCCEEDED))
continue acting;
_iterate=false;
onCompleteSuccess();
break processing;
default:
throw new IllegalStateException("state="+state);
}
}
case FAILED:
case CLOSED:
break processing;
case IDLE:
case PENDING:
default:
throw new IllegalStateException("action="+action);
throw new IllegalStateException("state="+state+" action="+action);
}
}
}
}
@ -373,28 +356,31 @@ public abstract class IteratingCallback implements Callback
{
case PROCESSING:
{
if (!_state.compareAndSet(State.PROCESSING, State.CALLED))
if (!_state.compareAndSet(state, State.CALLED))
continue loop;
break loop;
}
case PENDING:
{
if (!_state.compareAndSet(State.PENDING, State.PROCESSING))
if (!_state.compareAndSet(state, State.PROCESSING))
continue loop;
processing();
break loop;
}
case CLOSED:
case FAILED:
{
// Too late!
break loop;
}
case LOCKED:
{
Thread.yield();
continue loop;
}
default:
{
throw new IllegalStateException(toString());
throw new IllegalStateException("state="+state);
}
}
}
@ -417,24 +403,27 @@ public abstract class IteratingCallback implements Callback
case FAILED:
case IDLE:
case CLOSED:
case CALLED:
{
// Already complete!.
// too late!.
break loop;
}
case LOCKED:
{
Thread.yield();
continue loop;
}
default:
}
case PENDING:
case PROCESSING:
{
if (!_state.compareAndSet(state, State.FAILED))
continue loop;
onCompleteFailure(x);
break loop;
}
default:
throw new IllegalStateException("state="+state);
}
}
}
@ -463,7 +452,6 @@ public abstract class IteratingCallback implements Callback
Thread.yield();
continue loop;
}
default:
{
if (!_state.compareAndSet(state, State.CLOSED))
@ -508,8 +496,8 @@ public abstract class IteratingCallback implements Callback
/**
* Resets this callback.
* <p/>
* A callback can only be reset to INACTIVE from the
* SUCCEEDED or FAILED states or if it is already INACTIVE.
* A callback can only be reset to IDLE from the
* SUCCEEDED or FAILED states or if it is already IDLE.
*
* @return true if the reset was successful
*/
@ -517,21 +505,24 @@ public abstract class IteratingCallback implements Callback
{
while (true)
{
switch(_state.get())
State state=_state.get();
switch(state)
{
case IDLE:
return true;
case SUCCEEDED:
if (!_state.compareAndSet(State.SUCCEEDED, State.IDLE))
if (!_state.compareAndSet(state, State.LOCKED))
continue;
_iterate=false;
_state.set(State.IDLE);
return true;
case FAILED:
if (!_state.compareAndSet(State.FAILED, State.IDLE))
if (!_state.compareAndSet(state, State.LOCKED))
continue;
_iterate=false;
_state.set(State.IDLE);
return true;
case LOCKED: