Converted IteratingCallback to use SpinLock

This commit is contained in:
Greg Wilkins 2015-03-11 15:37:25 +11:00
parent 5456de2160
commit 870e0ab0b3
1 changed files with 119 additions and 153 deletions

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.util;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.thread.SpinLock;
/** /**
* This specialized callback implements a pattern that allows * This specialized callback implements a pattern that allows
* a large job to be broken into smaller tasks using iteration * a large job to be broken into smaller tasks using iteration
@ -92,12 +94,7 @@ public abstract class IteratingCallback implements Callback
/** /**
* This callback has been closed and cannot be reset. * This callback has been closed and cannot be reset.
*/ */
CLOSED, CLOSED
/**
* State is locked while leaving processing state to check the iterate boolean
*/
LOCKED
} }
/** /**
@ -125,18 +122,19 @@ public abstract class IteratingCallback implements Callback
SUCCEEDED SUCCEEDED
} }
private final AtomicReference<State> _state; private SpinLock _lock = new SpinLock();
private State _state;
private boolean _iterate; private boolean _iterate;
protected IteratingCallback() protected IteratingCallback()
{ {
_state = new AtomicReference<>(State.IDLE); _state = State.IDLE;
} }
protected IteratingCallback(boolean needReset) protected IteratingCallback(boolean needReset)
{ {
_state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE); _state = needReset ? State.SUCCEEDED : State.IDLE;
} }
/** /**
@ -183,43 +181,40 @@ public abstract class IteratingCallback implements Callback
*/ */
public void iterate() public void iterate()
{ {
boolean process=false;
loop: while (true) loop: while (true)
{ {
State state=_state.get(); try (SpinLock.Lock lock = _lock.lock())
switch (state)
{ {
case PENDING: switch (_state)
case CALLED: {
// process will be called when callback is handled case PENDING:
break loop; case CALLED:
// process will be called when callback is handled
case IDLE: break loop;
if (!_state.compareAndSet(state,State.PROCESSING))
continue;
processing();
break loop;
case PROCESSING:
if (!_state.compareAndSet(state,State.LOCKED))
continue;
// Tell the thread that is processing that it must iterate again
_iterate=true;
_state.set(State.PROCESSING);
break loop;
case LOCKED:
Thread.yield();
continue loop;
case FAILED: case IDLE:
case SUCCEEDED: _state=State.PROCESSING;
break loop; process=true;
break loop;
case CLOSED: case PROCESSING:
default: _iterate=true;
throw new IllegalStateException("state="+state); break loop;
case FAILED:
case SUCCEEDED:
break loop;
case CLOSED:
default:
throw new IllegalStateException("state="+_state);
}
} }
} }
if (process)
processing();
} }
private void processing() private void processing()
@ -227,6 +222,8 @@ public abstract class IteratingCallback implements Callback
// This should only ever be called when in processing state, however a failed or close call // This should only ever be called when in processing state, however a failed or close call
// may happen concurrently, so state is not assumed. // may happen concurrently, so state is not assumed.
boolean on_complete_success=false;
// While we are processing // While we are processing
processing: while (true) processing: while (true)
{ {
@ -242,13 +239,10 @@ public abstract class IteratingCallback implements Callback
break processing; break processing;
} }
// loop until we have successfully acted on the action we have just received // acted on the action we have just received
acting: while(true) try(SpinLock.Lock lock = _lock.lock())
{ {
// action handling needs to know the state switch (_state)
State state=_state.get();
switch (state)
{ {
case PROCESSING: case PROCESSING:
{ {
@ -256,67 +250,56 @@ public abstract class IteratingCallback implements Callback
{ {
case IDLE: case IDLE:
{ {
// lock the state
if (!_state.compareAndSet(state,State.LOCKED))
continue acting;
// Has iterate been called while we were processing? // Has iterate been called while we were processing?
if (_iterate) if (_iterate)
{ {
// yes, so skip idle and keep processing // yes, so skip idle and keep processing
_iterate=false; _iterate=false;
_state.set(State.PROCESSING); _state=State.PROCESSING;
continue processing; continue processing;
} }
// No, so we can go idle // No, so we can go idle
_state.set(State.IDLE); _state=State.IDLE;
break processing; break processing;
} }
case SCHEDULED: case SCHEDULED:
{ {
if (!_state.compareAndSet(state, State.PENDING))
continue acting;
// we won the race against the callback, so the callback has to process and we can break processing // we won the race against the callback, so the callback has to process and we can break processing
_state=State.PENDING;
break processing; break processing;
} }
case SUCCEEDED: case SUCCEEDED:
{ {
if (!_state.compareAndSet(state, State.LOCKED)) // we lost the race against the callback,
continue acting;
_iterate=false; _iterate=false;
_state.set(State.SUCCEEDED); _state=State.SUCCEEDED;
onCompleteSuccess(); on_complete_success=true;
break processing; break processing;
} }
default: default:
throw new IllegalStateException("state="+state+" action="+action); throw new IllegalStateException("state="+_state+" action="+action);
} }
} }
case CALLED: case CALLED:
{ {
switch (action) switch (action)
{ {
case SCHEDULED: case SCHEDULED:
{ {
if (!_state.compareAndSet(state, State.PROCESSING))
continue acting;
// we lost the race, so we have to keep processing // we lost the race, so we have to keep processing
_state=State.PROCESSING;
continue processing; continue processing;
} }
default: default:
throw new IllegalStateException("state="+state+" action="+action); throw new IllegalStateException("state="+_state+" action="+action);
} }
} }
case LOCKED:
Thread.yield();
continue acting;
case SUCCEEDED: case SUCCEEDED:
case FAILED: case FAILED:
@ -326,12 +309,15 @@ public abstract class IteratingCallback implements Callback
case IDLE: case IDLE:
case PENDING: case PENDING:
default: default:
throw new IllegalStateException("state="+state+" action="+action); throw new IllegalStateException("state="+_state+" action="+action);
} }
} }
} }
if (on_complete_success)
onCompleteSuccess();
} }
/** /**
* Invoked when the sub task succeeds. * Invoked when the sub task succeeds.
* Subclasses that override this method must always remember to call * Subclasses that override this method must always remember to call
@ -340,41 +326,36 @@ public abstract class IteratingCallback implements Callback
@Override @Override
public void succeeded() public void succeeded()
{ {
loop: while (true) boolean process=false;
try(SpinLock.Lock lock = _lock.lock())
{ {
State state = _state.get(); switch (_state)
switch (state)
{ {
case PROCESSING: case PROCESSING:
{ {
if (!_state.compareAndSet(state, State.CALLED)) _state=State.CALLED;
continue loop; break;
break loop;
} }
case PENDING: case PENDING:
{ {
if (!_state.compareAndSet(state, State.PROCESSING)) _state=State.PROCESSING;
continue loop; process=true;
processing(); break;
break loop;
} }
case CLOSED: case CLOSED:
case FAILED: case FAILED:
{ {
// Too late! // Too late!
break loop; break;
} }
case LOCKED:
{
Thread.yield();
continue loop;
}
default: default:
{ {
throw new IllegalStateException("state="+state); throw new IllegalStateException("state="+_state);
} }
} }
} }
if (process)
processing();
} }
/** /**
@ -385,73 +366,59 @@ public abstract class IteratingCallback implements Callback
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
loop: while (true) boolean failure=false;
try(SpinLock.Lock lock = _lock.lock())
{ {
State state = _state.get(); switch (_state)
switch (state)
{ {
case SUCCEEDED: case SUCCEEDED:
case FAILED: case FAILED:
case IDLE: case IDLE:
case CLOSED: case CLOSED:
case CALLED: case CALLED:
{
// too late!. // too late!.
break loop; break;
}
case LOCKED:
{
Thread.yield();
continue loop;
}
case PENDING: case PENDING:
case PROCESSING: case PROCESSING:
{ {
if (!_state.compareAndSet(state, State.FAILED)) _state=State.FAILED;
continue loop; failure=true;
break;
onCompleteFailure(x);
break loop;
} }
default: default:
throw new IllegalStateException("state="+state); throw new IllegalStateException("state="+_state);
} }
} }
if (failure)
onCompleteFailure(x);
} }
public void close() public void close()
{ {
loop: while (true) boolean failure=false;
try(SpinLock.Lock lock = _lock.lock())
{ {
State state = _state.get(); switch (_state)
switch (state)
{ {
case IDLE: case IDLE:
case SUCCEEDED: case SUCCEEDED:
case FAILED: case FAILED:
{ case PROCESSING:
if (!_state.compareAndSet(state, State.CLOSED)) _state=State.CLOSED;
continue loop; break;
break loop;
}
case CLOSED: case CLOSED:
{ break;
break loop;
}
case LOCKED:
{
Thread.yield();
continue loop;
}
default: default:
{ _state=State.CLOSED;
if (!_state.compareAndSet(state, State.CLOSED)) failure=true;
continue loop;
onCompleteFailure(new ClosedChannelException());
break loop;
}
} }
} }
if(failure)
onCompleteFailure(new ClosedChannelException());
} }
/* /*
@ -460,12 +427,18 @@ public abstract class IteratingCallback implements Callback
*/ */
boolean isIdle() boolean isIdle()
{ {
return _state.get() == State.IDLE; try(SpinLock.Lock lock = _lock.lock())
{
return _state == State.IDLE;
}
} }
public boolean isClosed() public boolean isClosed()
{ {
return _state.get() == State.CLOSED; try(SpinLock.Lock lock = _lock.lock())
{
return _state == State.CLOSED;
}
} }
/** /**
@ -473,7 +446,10 @@ public abstract class IteratingCallback implements Callback
*/ */
public boolean isFailed() public boolean isFailed()
{ {
return _state.get() == State.FAILED; try(SpinLock.Lock lock = _lock.lock())
{
return _state == State.FAILED;
}
} }
/** /**
@ -481,7 +457,10 @@ public abstract class IteratingCallback implements Callback
*/ */
public boolean isSucceeded() public boolean isSucceeded()
{ {
return _state.get() == State.SUCCEEDED; try(SpinLock.Lock lock = _lock.lock())
{
return _state == State.SUCCEEDED;
}
} }
/** /**
@ -494,31 +473,18 @@ public abstract class IteratingCallback implements Callback
*/ */
public boolean reset() public boolean reset()
{ {
while (true) try(SpinLock.Lock lock = _lock.lock())
{ {
State state=_state.get(); switch(_state)
switch(state)
{ {
case IDLE: case IDLE:
return true; return true;
case SUCCEEDED:
if (!_state.compareAndSet(state, State.LOCKED))
continue;
_iterate=false;
_state.set(State.IDLE);
return true;
case FAILED:
if (!_state.compareAndSet(state, State.LOCKED))
continue;
_iterate=false;
_state.set(State.IDLE);
return true;
case LOCKED: case SUCCEEDED:
Thread.yield(); case FAILED:
continue; _iterate=false;
_state=State.IDLE;
return true;
default: default:
return false; return false;