424043 - IteratingCallback Idle race.

Renamed Next enum to Action, and renamed some constant of the State
enum to avoid confusion with the Action enum.

Simplified succeeded() and failed(Throwable) code,
covering also cases not covered before.

Fixed case SCHEDULED in succeeded() that was returning in case the
compareAndSet failed.

Fixed race in perform(), where 2 threads may execute concurrently
after having returned from process().

Vastly improved javadocs and comments.
This commit is contained in:
Simone Bordet 2013-12-15 00:47:55 +01:00
parent 7141483356
commit 41fc2b8f87
8 changed files with 358 additions and 239 deletions

View File

@ -674,11 +674,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private class ContentCallback extends IteratingCallback
{
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return Next.IDLE;
return Action.IDLE;
Request request = exchange.getRequest();
HttpContent content = HttpSender.this.content;
@ -687,21 +687,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (contentBuffer != null)
{
if (!someToContent(request, contentBuffer))
return Next.IDLE;
return Action.IDLE;
}
if (content.advance())
{
// There is more content to send
sendContent(exchange, content, this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
if (content.isConsumed())
{
sendContent(exchange, content, lastCallback);
return Next.SCHEDULED;
return Action.EXECUTING;
}
while (true)
@ -714,7 +714,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (updateSenderState(current, SenderState.IDLE))
{
LOG.debug("Waiting for deferred content for {}", request);
return Next.IDLE;
return Action.IDLE;
}
break;
}
@ -725,7 +725,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests
sendContent(exchange, content, this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
break;
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.server;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.http.HttpGenerator;
@ -38,7 +37,6 @@ import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -489,7 +487,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public Next process() throws Exception
public Action process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
@ -569,7 +567,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
else
continue;
return Next.SCHEDULED;
return Action.EXECUTING;
}
case SHUTDOWN_OUT:
{
@ -584,7 +582,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
_bufferPool.release(_header);
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
case CONTINUE:
{
@ -641,7 +639,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public Next process() throws Exception
public Action process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
@ -686,7 +684,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
else
continue;
return Next.SCHEDULED;
return Action.EXECUTING;
}
case SHUTDOWN_OUT:
{
@ -695,7 +693,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
case DONE:
{
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
case CONTINUE:
{

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
@ -758,23 +757,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected Next process()
protected Action process()
{
if (BufferUtil.hasContent(_aggregate))
{
_flushed=true;
write(_aggregate, false, this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
if (!_flushed)
{
_flushed=true;
write(BufferUtil.EMPTY_BUFFER,false,this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
}
@ -807,21 +806,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected Next process()
protected Action process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_completed=_len==0;
write(_aggregate, _complete && _completed, this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
// Can we just aggregate the remainder?
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
{
BufferUtil.put(_buffer,_aggregate);
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
// Is there data left to write?
@ -832,7 +831,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
_completed=true;
write(_buffer, _complete, this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
// otherwise take a slice
@ -844,7 +843,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_slice.position(p);
_completed=!_buffer.hasRemaining();
write(_slice, _complete && _completed, this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
// all content written, but if we have not yet signal completion, we
@ -855,12 +854,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
_completed=true;
write(BufferUtil.EMPTY_BUFFER, _complete, this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
closed();
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
}
@ -887,7 +886,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
@ -897,7 +896,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
// Read until buffer full or EOF
@ -915,7 +914,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.position(0);
_buffer.limit(len);
write(_buffer,_eof,this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
@Override
@ -958,7 +957,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
@ -967,7 +966,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
// Read from stream until buffer full or EOF
@ -979,7 +978,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.flip();
write(_buffer,_eof,this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
@Override

View File

@ -297,7 +297,7 @@ public class GzipHttpOutput extends HttpOutput
}
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
if (_deflater.needsInput())
{
@ -307,11 +307,11 @@ public class GzipHttpOutput extends HttpOutput
_deflater=null;
getHttpChannel().getByteBufferPool().release(_buffer);
_buffer=null;
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
if (!_complete)
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
BufferUtil.compact(_buffer);
@ -324,7 +324,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer();
superWrite(_buffer,complete,this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
}
@ -342,7 +342,7 @@ public class GzipHttpOutput extends HttpOutput
}
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
if (_deflater.needsInput())
{
@ -354,11 +354,11 @@ public class GzipHttpOutput extends HttpOutput
_deflater=null;
getHttpChannel().getByteBufferPool().release(_buffer);
_buffer=null;
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
if (!_complete)
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
else
{
@ -389,7 +389,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer();
superWrite(_buffer,complete,this);
return Next.SCHEDULED;
return Action.EXECUTING;
}
}

View File

@ -143,7 +143,7 @@ public class Flusher
private final Set<IStream> stalled = new HashSet<>();
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
synchronized (lock)
{
@ -194,7 +194,7 @@ public class Flusher
}
if (active.size() == 0)
return Next.IDLE;
return Action.IDLE;
// Get the bytes to write
ByteBuffer[] buffers = new ByteBuffer[active.size()];
@ -213,13 +213,13 @@ public class Flusher
// MAX_GATHER parameter, and/or autotune the buffer returned
// by FrameBytes.getByteBuffer() (see also comment there).
return Next.SCHEDULED;
return Action.EXECUTING;
}
@Override
protected void completed()
{
// will never be called as process always returns SCHEDULED or IDLE
// will never be called as process always returns EXECUTING or IDLE
throw new IllegalStateException();
}

View File

@ -20,228 +20,350 @@ package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
/** Iterating Callback.
* <p>This specialized callback implements a pattern that allows a
* large job to be broken into smaller tasks using iteration rather than
* recursion.
* <p>
* A typical pattern used with asynchronous callbacks, is the next IO is
* done from within the scope of the success callback. The problem with this
* is that if the callback thread is the same one that calls to IO instruction,
* then recursion results and eventually a dispatch has to be done to
* avoid stack overflow (see {@link ForkInvoker}).
* <p>To avoid this issue, this callback uses an AtomicReference to note
* if the success callback has been called during the processing of a
* sub task, and if so then the processing iterates rather than recurses.
* </p>
* <p>This callback is passed to the asynchronous handling of each sub
* task and a call the {@link #succeeded()} on this call back represents
* completion of the subtask. Only once all the subtasks are completed is
* the {#completed()} method called.</p>
*
/**
* This specialized callback implements a pattern that allows
* a large job to be broken into smaller tasks using iteration
* rather than recursion.
* <p/>
* A typical example is the write of a large content to a socket,
* divided in chunks. Chunk C1 is written by thread T1, which
* also invokes the callback, which writes chunk C2, which invokes
* the callback again, which writes chunk C3, and so forth.
* <p/>
* The problem with the example is that if the callback thread
* is the same that performs the I/O operation, then the process
* is recursive and may result in a stack overflow.
* To avoid the stack overflow, a thread dispatch must be performed,
* causing context switching and cache misses, affecting performance.
* <p/>
* To avoid this issue, this callback uses an AtomicReference to
* record whether success callback has been called during the processing
* of a sub task, and if so then the processing iterates rather than
* recurring.
* <p/>
* Subclasses must implement method {@link #process()} where the sub
* task is executed and a suitable {@link IteratingCallback.Action} is
* returned to this callback to indicate the overall progress of the job.
* This callback is passed to the asynchronous execution of each sub
* task and a call the {@link #succeeded()} on this callback represents
* the completion of the sub task.
*/
public abstract class IteratingCallback implements Callback
{
protected enum Next { IDLE, SCHEDULED, SUCCEEDED, FAILED };
protected enum State { IDLE, SCHEDULED, ITERATING, ITERATE_AGAIN, SUCCEEDED, FAILED };
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
public IteratingCallback()
{
}
abstract protected void completed();
/**
* TODO - FIX
* Method called by iterate to process the task.
* @return Then next state:
* <dl>
* <dt>SUCCEEDED</dt><dd>Return if the total task has completed</dd>
* <dt>SCHEDULED</dt><dd>This callback has been scheduled and {@link #succeeded()} or {@link #failed(Throwable)} will eventually be called (if they have not been called already!)</dd>
* <dt>IDLE</dt><dd>no progress can be made and another call to {@link #iterate()} is required in order to progress the task</dd>
* <dt>FAILED</dt><dd>processing has failed</dd>
* </dl>
*
* @throws Exception
* The indication of the overall progress of the overall job that
* implementations of {@link #process()} must return.
*/
abstract protected Next process() throws Exception;
/* ------------------------------------------------------------ */
/** This method is called initially to start processing and
* is then called by subsequent sub task success to continue
* processing. If {@link #process()} returns IDLE, then iterate should be called
* again to restart processing.
* It is safe to call iterate multiple times as only the first thread to move
* the state out of IDLE will actually do any iteration and processing.
protected enum Action
{
/**
* Indicates that {@link #process()} has no more work to do,
* but the overall job is not completed yet, probably waiting
* for additional events to trigger more work.
*/
IDLE,
/**
* Indicates that {@link #process()} is executing asynchronously
* a sub task, where the execution has started but the callback
* may have not yet been invoked.
*/
EXECUTING,
/**
* Indicates that {@link #process()} has completed the overall job.
*/
SUCCEEDED,
/**
* Indicates that {@link #process()} has failed the overall job.
*/
FAILED
}
private final AtomicReference<State> _state = new AtomicReference<>(State.INACTIVE);
/**
* Method called by {@link #iterate()} to process the sub task.
* <p/>
* Implementations must start the asynchronous execution of the sub task
* (if any) and return an appropriate action:
* <ul>
* <li>{@link Action#IDLE} when no sub tasks are available for execution
* but the overall job is not completed yet</li>
* <li>{@link Action#EXECUTING} when the sub task asynchronous execution
* has been started</li>
* <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
* <li>{@link Action#FAILED} when the overall job cannot be completed</li>
* </ul>
*
* @throws Exception if the sub task processing throws
*/
protected abstract Action process() throws Exception;
/**
* Invoked when the overall task has completed successfully.
*/
protected abstract void completed();
/**
* This method must be invoked by applications to start the processing
* of sub tasks.
* <p/>
* If {@link #process()} returns {@link Action#IDLE}, then this method
* should be called again to restart processing.
* It is safe to call iterate multiple times from multiple threads since only
* the first thread to move the state out of INACTIVE will actually do any iteration
* and processing.
*/
public void iterate()
{
try
{
while(true)
while (true)
{
switch (_state.get())
{
case IDLE:
// Keep iterating as long as succeeded() is called during process()
// If we are in WAITING state, either this is the first iteration or
// succeeded()/failed() were called already.
while(_state.compareAndSet(State.IDLE,State.ITERATING))
{
Next next = process();
switch (next)
{
case SUCCEEDED:
// The task has complete, there should have been no callbacks
// Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE
if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED)&&
!_state.compareAndSet(State.ITERATE_AGAIN,State.SUCCEEDED))
throw new IllegalStateException("state="+_state.get());
completed();
return;
case SCHEDULED:
// This callback has been scheduled, so it may or may not have
// already been called back. Let's find out
// Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE
if (_state.compareAndSet(State.ITERATING,State.SCHEDULED) ||
_state.compareAndSet(State.ITERATE_AGAIN,State.SCHEDULED))
// not called back yet, so lets wait for it
return;
// call back must have happened, so lets iterate
continue;
case IDLE:
// No more progress can be made by this call to iterate
if (_state.compareAndSet(State.ITERATING,State.IDLE))
return;
// was iterate called again since we already decided to go IDLE?
if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE))
continue; // Try another iteration as more work may have been added while previous process was returning
throw new IllegalStateException("state="+_state.get());
case FAILED:
_state.set(State.FAILED);
return;
default:
throw new IllegalStateException("state="+_state.get()+" next="+next);
}
}
break;
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.ITERATE_AGAIN))
case INACTIVE:
{
if (perform())
return;
break;
}
case ITERATING:
{
if (_state.compareAndSet(State.ITERATING, State.ITERATE_AGAIN))
return;
break;
}
default:
{
return;
}
}
}
}
catch(Exception e)
catch (Throwable x)
{
failed(e);
failed(x);
}
}
/* ------------------------------------------------------------ */
private boolean perform() throws Exception
{
// Keeps iterating as long as succeeded() is called during process().
// If we are in INACTIVE state, either this is the first iteration or
// succeeded()/failed() were called already.
while (_state.compareAndSet(State.INACTIVE, State.ITERATING))
{
// Method process() can only be called by one thread at a time because
// it is guarded by the CaS above. However, the case blocks below may
// be executed concurrently in this case: T1 calls process() which
// executes the asynchronous sub task, which calls succeeded(), which
// moves the state into INACTIVE, then returns EXECUTING; T2 calls
// iterate(), state is now INACTIVE and process() is called again and
// returns another action. Now we have 2 threads that may execute the
// action case blocks below concurrently; therefore each case block
// has to be prepared to fail the CaS it's doing.
Action action = process();
switch (action)
{
case IDLE:
{
// No more progress can be made.
if (_state.compareAndSet(State.ITERATING, State.INACTIVE))
return true;
// Was iterate() called again since we already decided to go INACTIVE ?
// If so, try another iteration as more work may have been added
// while the previous call to process() was returning.
if (_state.compareAndSet(State.ITERATE_AGAIN, State.INACTIVE))
continue;
// State may have changed concurrently, try again.
continue;
}
case EXECUTING:
{
// The sub task is executing, and the callback for it may or
// may not have already been called yet, which we figure out below.
// Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE.
if (_state.compareAndSet(State.ITERATING, State.SCHEDULED) ||
_state.compareAndSet(State.ITERATE_AGAIN, State.SCHEDULED))
// Not called back yet, so wait.
return true;
// Call back must have happened, so iterate.
continue;
}
case SUCCEEDED:
{
// The overall job has completed.
success();
completed();
return true;
}
case FAILED:
{
failure();
return true;
}
default:
{
throw new IllegalStateException(toString());
}
}
}
return false;
}
/**
* Invoked when the sub task succeeds.
* Subclasses that override this method must always remember to call
* {@code super.succeeded()}.
*/
@Override
public void succeeded()
{
// Try a short cut for the fast method. If we are still iterating
if (_state.compareAndSet(State.ITERATING,State.IDLE))
// then next loop will continue processing, so nothing to do here
return;
// OK do it properly
loop: while(true)
while (true)
{
switch(_state.get())
State current = _state.get();
switch (current)
{
case ITERATE_AGAIN:
if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE))
break loop;
continue;
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.IDLE))
break loop;
{
if (_state.compareAndSet(current, State.INACTIVE))
return;
continue;
}
case SCHEDULED:
if (_state.compareAndSet(State.SCHEDULED,State.IDLE))
iterate();
break loop;
case IDLE:
// TODO - remove this once old ICB usages updated
{
// If we can move from SCHEDULED to INACTIVE
// then we continue in order to hit the INACTIVE
// case, since we need to iterate() more.
if (_state.compareAndSet(current, State.INACTIVE))
continue;
continue;
}
case INACTIVE:
{
iterate();
break loop;
return;
}
default:
throw new IllegalStateException(this+" state="+_state.get());
{
throw new IllegalStateException(toString());
}
}
}
}
/* ------------------------------------------------------------ */
/**
* Derivations of this method should always call super.failed(x)
* to check the state before handling the failure.
* @see org.eclipse.jetty.util.Callback#failed(java.lang.Throwable)
/**
* Invoked when the sub task fails.
* Subclasses that override this method must always remember to call
* {@code super.failed(Throwable)}.
*/
@Override
public void failed(Throwable x)
{
loop: while(true)
{
switch(_state.get())
{
case ITERATE_AGAIN:
if (_state.compareAndSet(State.ITERATE_AGAIN,State.FAILED))
break loop;
continue;
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.FAILED))
break loop;
continue;
case SCHEDULED:
if (_state.compareAndSet(State.SCHEDULED,State.FAILED))
break loop;
continue;
case IDLE:
// TODO - remove this once old ICB usages updated
if (_state.compareAndSet(State.IDLE,State.FAILED))
break loop;
continue;
failure();
}
default:
throw new IllegalStateException("state="+_state.get(),x);
private void success()
{
while (true)
{
State current = _state.get();
if (current == State.FAILED)
{
throw new IllegalStateException(toString());
}
else
{
if (_state.compareAndSet(current, State.SUCCEEDED))
break;
}
}
}
private void failure()
{
while (true)
{
State current = _state.get();
if (current == State.SUCCEEDED)
{
throw new IllegalStateException(toString());
}
else
{
if (_state.compareAndSet(current, State.FAILED))
break;
}
}
}
/**
* @return whether this callback is idle and {@link #iterate()} needs to be called
*/
public boolean isIdle()
{
return _state.get()==State.IDLE;
return _state.get() == State.INACTIVE;
}
/**
* @return whether this callback has failed
*/
public boolean isFailed()
{
return _state.get()==State.FAILED;
return _state.get() == State.FAILED;
}
/**
* @return whether this callback has succeeded
*/
public boolean isSucceeded()
{
return _state.get()==State.SUCCEEDED;
return _state.get() == State.SUCCEEDED;
}
@Override
public String toString()
{
return String.format("%s[%s]", super.toString(), _state);
}
/**
* The internal states of this callback
*/
private enum State
{
/**
* This callback is inactive, ready to iterate.
*/
INACTIVE,
/**
* This callback is iterating and {@link #process()} has scheduled an
* asynchronous operation by returning {@link Action#EXECUTING}, but
* the operation is still undergoing.
*/
SCHEDULED,
/**
* This callback is iterating and {@link #process()} has been called
* but not returned yet.
*/
ITERATING,
/**
* While this callback was iterating, another request for iteration
* has been issued, so the iteration must continue even if a previous
* call to {@link #process()} returned {@link Action#IDLE}.
*/
ITERATE_AGAIN,
/**
* The overall job has succeeded.
*/
SUCCEEDED,
/**
* The overall job has failed.
*/
FAILED
}
}

View File

@ -54,15 +54,15 @@ public class IteratingCallbackTest
int i=10;
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
processed++;
if (i-->1)
{
succeeded(); // fake a completed IO operation
return Next.SCHEDULED;
return Action.EXECUTING;
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
};
@ -81,15 +81,15 @@ public class IteratingCallbackTest
int i=4;
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return Next.SCHEDULED;
return Action.EXECUTING;
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
};
@ -108,15 +108,15 @@ public class IteratingCallbackTest
int i=4;
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return Next.SCHEDULED;
return Action.EXECUTING;
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
};
@ -145,7 +145,7 @@ public class IteratingCallbackTest
int i=10;
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
processed++;
if (i-->1)
@ -154,9 +154,9 @@ public class IteratingCallbackTest
succeeded(); // fake a completed IO operation
else
failed(new Exception("testing"));
return Next.SCHEDULED;
return Action.EXECUTING;
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
};
@ -173,15 +173,15 @@ public class IteratingCallbackTest
int i=4;
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS);
return Next.SCHEDULED;
return Action.EXECUTING;
}
return Next.SUCCEEDED;
return Action.SUCCEEDED;
}
};
@ -202,7 +202,7 @@ public class IteratingCallbackTest
int i=5;
@Override
protected Next process()
protected Action process()
{
processed++;
@ -210,11 +210,11 @@ public class IteratingCallbackTest
{
case 5:
succeeded();
return Next.SCHEDULED;
return Action.EXECUTING;
case 4:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return Next.SCHEDULED;
return Action.EXECUTING;
case 3:
scheduler.schedule(new Runnable()
@ -225,18 +225,18 @@ public class IteratingCallbackTest
idle.countDown();
}
},5,TimeUnit.MILLISECONDS);
return Next.IDLE;
return Action.IDLE;
case 2:
succeeded();
return Next.SCHEDULED;
return Action.EXECUTING;
case 1:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return Next.SCHEDULED;
return Action.EXECUTING;
case 0:
return Next.SUCCEEDED;
return Action.SUCCEEDED;
default:
throw new IllegalStateException();

View File

@ -212,12 +212,12 @@ public class FrameFlusher
@Override
protected void completed()
{
// will never be called as process always returns SCHEDULED or IDLE
// will never be called as process always returns EXECUTING or IDLE
throw new IllegalStateException();
}
@Override
protected Next process() throws Exception
protected Action process() throws Exception
{
synchronized (lock)
{
@ -241,11 +241,11 @@ public class FrameFlusher
}
if (buffers.size()==0)
return Next.IDLE;
return Action.IDLE;
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Next.SCHEDULED;
return Action.EXECUTING;
}
@Override