424043 - IteratingCallback Idle race.

A few renamings and fixes to avoid IllegalStateExceptions.
This commit is contained in:
Simone Bordet 2013-12-15 23:50:37 +01:00
parent 8bf4a4f263
commit 8d621a9331
8 changed files with 55 additions and 57 deletions

View File

@ -694,14 +694,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{ {
// There is more content to send // There is more content to send
sendContent(exchange, content, this); sendContent(exchange, content, this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
if (content.isConsumed()) if (content.isConsumed())
{ {
sendContent(exchange, content, lastCallback); sendContent(exchange, content, lastCallback);
return Action.EXECUTING; return Action.SCHEDULED;
} }
while (true) while (true)
@ -725,7 +725,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
LOG.debug("Deferred content available for {}", request); LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests // TODO: this case is not covered by tests
sendContent(exchange, content, this); sendContent(exchange, content, this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
break; break;
} }

View File

@ -567,7 +567,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
else else
continue; continue;
return Action.EXECUTING; return Action.SCHEDULED;
} }
case SHUTDOWN_OUT: case SHUTDOWN_OUT:
{ {
@ -684,7 +684,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
else else
continue; continue;
return Action.EXECUTING; return Action.SCHEDULED;
} }
case SHUTDOWN_OUT: case SHUTDOWN_OUT:
{ {

View File

@ -763,14 +763,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_flushed=true; _flushed=true;
write(_aggregate, false, this); write(_aggregate, false, this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
if (!_flushed) if (!_flushed)
{ {
_flushed=true; _flushed=true;
write(BufferUtil.EMPTY_BUFFER,false,this); write(BufferUtil.EMPTY_BUFFER,false,this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
@ -813,7 +813,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_completed=_len==0; _completed=_len==0;
write(_aggregate, _complete && _completed, this); write(_aggregate, _complete && _completed, this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
// Can we just aggregate the remainder? // Can we just aggregate the remainder?
@ -831,7 +831,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_completed=true; _completed=true;
write(_buffer, _complete, this); write(_buffer, _complete, this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
// otherwise take a slice // otherwise take a slice
@ -843,7 +843,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_slice.position(p); _slice.position(p);
_completed=!_buffer.hasRemaining(); _completed=!_buffer.hasRemaining();
write(_slice, _complete && _completed, this); write(_slice, _complete && _completed, this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
// all content written, but if we have not yet signal completion, we // all content written, but if we have not yet signal completion, we
@ -854,7 +854,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_completed=true; _completed=true;
write(BufferUtil.EMPTY_BUFFER, _complete, this); write(BufferUtil.EMPTY_BUFFER, _complete, this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
closed(); closed();
} }
@ -914,7 +914,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.position(0); _buffer.position(0);
_buffer.limit(len); _buffer.limit(len);
write(_buffer,_eof,this); write(_buffer,_eof,this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
@Override @Override
@ -978,7 +978,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.flip(); _buffer.flip();
write(_buffer,_eof,this); write(_buffer,_eof,this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
@Override @Override

View File

@ -324,7 +324,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer(); addTrailer();
superWrite(_buffer,complete,this); superWrite(_buffer,complete,this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
} }
@ -389,7 +389,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer(); addTrailer();
superWrite(_buffer,complete,this); superWrite(_buffer,complete,this);
return Action.EXECUTING; return Action.SCHEDULED;
} }
} }

View File

@ -213,13 +213,13 @@ public class Flusher
// MAX_GATHER parameter, and/or autotune the buffer returned // MAX_GATHER parameter, and/or autotune the buffer returned
// by FrameBytes.getByteBuffer() (see also comment there). // by FrameBytes.getByteBuffer() (see also comment there).
return Action.EXECUTING; return Action.SCHEDULED;
} }
@Override @Override
protected void completed() protected void completed()
{ {
// will never be called as process always returns EXECUTING or IDLE // will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException(); throw new IllegalStateException();
} }

View File

@ -67,7 +67,7 @@ public abstract class IteratingCallback implements Callback
* a sub task, where the execution has started but the callback * a sub task, where the execution has started but the callback
* may have not yet been invoked. * may have not yet been invoked.
*/ */
EXECUTING, SCHEDULED,
/** /**
* Indicates that {@link #process()} has completed the overall job. * Indicates that {@link #process()} has completed the overall job.
*/ */
@ -88,7 +88,7 @@ public abstract class IteratingCallback implements Callback
* <ul> * <ul>
* <li>{@link Action#IDLE} when no sub tasks are available for execution * <li>{@link Action#IDLE} when no sub tasks are available for execution
* but the overall job is not completed yet</li> * but the overall job is not completed yet</li>
* <li>{@link Action#EXECUTING} when the sub task asynchronous execution * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
* has been started</li> * has been started</li>
* <li>{@link Action#SUCCEEDED} when the overall job is completed</li> * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
* <li>{@link Action#FAILED} when the overall job cannot be completed</li> * <li>{@link Action#FAILED} when the overall job cannot be completed</li>
@ -123,7 +123,7 @@ public abstract class IteratingCallback implements Callback
{ {
case INACTIVE: case INACTIVE:
{ {
if (perform()) if (processIterations())
return; return;
break; break;
} }
@ -146,7 +146,7 @@ public abstract class IteratingCallback implements Callback
} }
} }
private boolean perform() throws Exception private boolean processIterations() throws Exception
{ {
// Keeps iterating as long as succeeded() is called during process(). // Keeps iterating as long as succeeded() is called during process().
// If we are in INACTIVE state, either this is the first iteration or // If we are in INACTIVE state, either this is the first iteration or
@ -157,7 +157,7 @@ public abstract class IteratingCallback implements Callback
// it is guarded by the CaS above. However, the case blocks below may // it is guarded by the CaS above. However, the case blocks below may
// be executed concurrently in this case: T1 calls process() which // be executed concurrently in this case: T1 calls process() which
// executes the asynchronous sub task, which calls succeeded(), which // executes the asynchronous sub task, which calls succeeded(), which
// moves the state into INACTIVE, then returns EXECUTING; T2 calls // moves the state into INACTIVE, then returns SCHEDULED; T2 calls
// iterate(), state is now INACTIVE and process() is called again and // iterate(), state is now INACTIVE and process() is called again and
// returns another action. Now we have 2 threads that may execute the // returns another action. Now we have 2 threads that may execute the
// action case blocks below concurrently; therefore each case block // action case blocks below concurrently; therefore each case block
@ -181,13 +181,13 @@ public abstract class IteratingCallback implements Callback
// State may have changed concurrently, try again. // State may have changed concurrently, try again.
continue; continue;
} }
case EXECUTING: case SCHEDULED:
{ {
// The sub task is executing, and the callback for it may or // The sub task is executing, and the callback for it may or
// may not have already been called yet, which we figure out below. // may not have already been called yet, which we figure out below.
// Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE. // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE.
if (_state.compareAndSet(State.ITERATING, State.SCHEDULED) || if (_state.compareAndSet(State.ITERATING, State.ACTIVE) ||
_state.compareAndSet(State.ITERATE_AGAIN, State.SCHEDULED)) _state.compareAndSet(State.ITERATE_AGAIN, State.ACTIVE))
// Not called back yet, so wait. // Not called back yet, so wait.
return true; return true;
// Call back must have happened, so iterate. // Call back must have happened, so iterate.
@ -196,13 +196,13 @@ public abstract class IteratingCallback implements Callback
case SUCCEEDED: case SUCCEEDED:
{ {
// The overall job has completed. // The overall job has completed.
success(); if (completeSuccess())
completed(); completed();
return true; return true;
} }
case FAILED: case FAILED:
{ {
failure(); completeFailure();
return true; return true;
} }
default: default:
@ -234,18 +234,14 @@ public abstract class IteratingCallback implements Callback
return; return;
continue; continue;
} }
case SCHEDULED: case ACTIVE:
{ {
// If we can move from SCHEDULED to INACTIVE // If we can move from ACTIVE to INACTIVE
// then we continue in order to hit the INACTIVE // then we are responsible to call iterate().
// case, since we need to iterate() more.
if (_state.compareAndSet(current, State.INACTIVE)) if (_state.compareAndSet(current, State.INACTIVE))
continue; iterate();
continue; // If we can't CaS, then failed() must have been
} // called, and we just return.
case INACTIVE:
{
iterate();
return; return;
} }
default: default:
@ -264,34 +260,36 @@ public abstract class IteratingCallback implements Callback
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
failure(); completeFailure();
} }
private void success() private boolean completeSuccess()
{ {
while (true) while (true)
{ {
State current = _state.get(); State current = _state.get();
if (current == State.FAILED) if (current == State.FAILED)
{ {
throw new IllegalStateException(toString()); // Success arrived too late, sorry.
return false;
} }
else else
{ {
if (_state.compareAndSet(current, State.SUCCEEDED)) if (_state.compareAndSet(current, State.SUCCEEDED))
break; return true;
} }
} }
} }
private void failure() private void completeFailure()
{ {
while (true) while (true)
{ {
State current = _state.get(); State current = _state.get();
if (current == State.SUCCEEDED) if (current == State.SUCCEEDED)
{ {
throw new IllegalStateException(toString()); // Failed arrived too late, sorry.
return;
} }
else else
{ {
@ -342,10 +340,10 @@ public abstract class IteratingCallback implements Callback
INACTIVE, INACTIVE,
/** /**
* This callback is iterating and {@link #process()} has scheduled an * This callback is iterating and {@link #process()} has scheduled an
* asynchronous operation by returning {@link Action#EXECUTING}, but * asynchronous operation by returning {@link Action#SCHEDULED}, but
* the operation is still undergoing. * the operation is still undergoing.
*/ */
SCHEDULED, ACTIVE,
/** /**
* This callback is iterating and {@link #process()} has been called * This callback is iterating and {@link #process()} has been called
* but not returned yet. * but not returned yet.

View File

@ -60,7 +60,7 @@ public class IteratingCallbackTest
if (i-->1) if (i-->1)
{ {
succeeded(); // fake a completed IO operation succeeded(); // fake a completed IO operation
return Action.EXECUTING; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -87,7 +87,7 @@ public class IteratingCallbackTest
if (i-->1) if (i-->1)
{ {
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return Action.EXECUTING; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -114,7 +114,7 @@ public class IteratingCallbackTest
if (i-->1) if (i-->1)
{ {
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return Action.EXECUTING; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -154,7 +154,7 @@ public class IteratingCallbackTest
succeeded(); // fake a completed IO operation succeeded(); // fake a completed IO operation
else else
failed(new Exception("testing")); failed(new Exception("testing"));
return Action.EXECUTING; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -179,7 +179,7 @@ public class IteratingCallbackTest
if (i-->1) if (i-->1)
{ {
scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS);
return Action.EXECUTING; return Action.SCHEDULED;
} }
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
@ -210,11 +210,11 @@ public class IteratingCallbackTest
{ {
case 5: case 5:
succeeded(); succeeded();
return Action.EXECUTING; return Action.SCHEDULED;
case 4: case 4:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return Action.EXECUTING; return Action.SCHEDULED;
case 3: case 3:
scheduler.schedule(new Runnable() scheduler.schedule(new Runnable()
@ -229,11 +229,11 @@ public class IteratingCallbackTest
case 2: case 2:
succeeded(); succeeded();
return Action.EXECUTING; return Action.SCHEDULED;
case 1: case 1:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return Action.EXECUTING; return Action.SCHEDULED;
case 0: case 0:
return Action.SUCCEEDED; return Action.SUCCEEDED;

View File

@ -212,7 +212,7 @@ public class FrameFlusher
@Override @Override
protected void completed() protected void completed()
{ {
// will never be called as process always returns EXECUTING or IDLE // will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -245,7 +245,7 @@ public class FrameFlusher
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()])); endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear(); buffers.clear();
return Action.EXECUTING; return Action.SCHEDULED;
} }
@Override @Override