424043 - IteratingCallback Idle race

This commit is contained in:
Greg Wilkins 2013-12-14 09:14:12 +11:00
parent 46ef022cf4
commit 7141483356
8 changed files with 133 additions and 99 deletions

View File

@ -674,11 +674,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private class ContentCallback extends IteratingCallback private class ContentCallback extends IteratingCallback
{ {
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
HttpExchange exchange = getHttpExchange(); HttpExchange exchange = getHttpExchange();
if (exchange == null) if (exchange == null)
return State.IDLE; return Next.IDLE;
Request request = exchange.getRequest(); Request request = exchange.getRequest();
HttpContent content = HttpSender.this.content; HttpContent content = HttpSender.this.content;
@ -687,21 +687,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (contentBuffer != null) if (contentBuffer != null)
{ {
if (!someToContent(request, contentBuffer)) if (!someToContent(request, contentBuffer))
return State.IDLE; return Next.IDLE;
} }
if (content.advance()) if (content.advance())
{ {
// There is more content to send // There is more content to send
sendContent(exchange, content, this); sendContent(exchange, content, this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
if (content.isConsumed()) if (content.isConsumed())
{ {
sendContent(exchange, content, lastCallback); sendContent(exchange, content, lastCallback);
return State.SCHEDULED; return Next.SCHEDULED;
} }
while (true) while (true)
@ -714,7 +714,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (updateSenderState(current, SenderState.IDLE)) if (updateSenderState(current, SenderState.IDLE))
{ {
LOG.debug("Waiting for deferred content for {}", request); LOG.debug("Waiting for deferred content for {}", request);
return State.IDLE; return Next.IDLE;
} }
break; break;
} }
@ -725,7 +725,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
LOG.debug("Deferred content available for {}", request); LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests // TODO: this case is not covered by tests
sendContent(exchange, content, this); sendContent(exchange, content, this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
break; break;
} }

View File

@ -489,7 +489,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
@Override @Override
public State process() throws Exception public Next process() throws Exception
{ {
ByteBuffer chunk = _chunk; ByteBuffer chunk = _chunk;
while (true) while (true)
@ -569,7 +569,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
else else
continue; continue;
return State.SCHEDULED; return Next.SCHEDULED;
} }
case SHUTDOWN_OUT: case SHUTDOWN_OUT:
{ {
@ -584,7 +584,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array()) if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
_bufferPool.release(_header); _bufferPool.release(_header);
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
case CONTINUE: case CONTINUE:
{ {
@ -641,7 +641,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
@Override @Override
public State process() throws Exception public Next process() throws Exception
{ {
ByteBuffer chunk = _chunk; ByteBuffer chunk = _chunk;
while (true) while (true)
@ -686,7 +686,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
else else
continue; continue;
return State.SCHEDULED; return Next.SCHEDULED;
} }
case SHUTDOWN_OUT: case SHUTDOWN_OUT:
{ {
@ -695,7 +695,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
case DONE: case DONE:
{ {
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
case CONTINUE: case CONTINUE:
{ {

View File

@ -758,23 +758,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
protected State process() protected Next process()
{ {
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_flushed=true; _flushed=true;
write(_aggregate, false, this); write(_aggregate, false, this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
if (!_flushed) if (!_flushed)
{ {
_flushed=true; _flushed=true;
write(BufferUtil.EMPTY_BUFFER,false,this); write(BufferUtil.EMPTY_BUFFER,false,this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
} }
@ -807,21 +807,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
protected State process() protected Next process()
{ {
// flush any content from the aggregate // flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
{ {
_completed=_len==0; _completed=_len==0;
write(_aggregate, _complete && _completed, this); write(_aggregate, _complete && _completed, this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
// Can we just aggregate the remainder? // Can we just aggregate the remainder?
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize) if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
{ {
BufferUtil.put(_buffer,_aggregate); BufferUtil.put(_buffer,_aggregate);
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
// Is there data left to write? // Is there data left to write?
@ -832,7 +832,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_completed=true; _completed=true;
write(_buffer, _complete, this); write(_buffer, _complete, this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
// otherwise take a slice // otherwise take a slice
@ -844,7 +844,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_slice.position(p); _slice.position(p);
_completed=!_buffer.hasRemaining(); _completed=!_buffer.hasRemaining();
write(_slice, _complete && _completed, this); write(_slice, _complete && _completed, this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
// all content written, but if we have not yet signal completion, we // all content written, but if we have not yet signal completion, we
@ -855,12 +855,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
_completed=true; _completed=true;
write(BufferUtil.EMPTY_BUFFER, _complete, this); write(BufferUtil.EMPTY_BUFFER, _complete, this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
closed(); closed();
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
} }
@ -887,7 +887,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
// Only return if EOF has previously been read and thus // Only return if EOF has previously been read and thus
// a write done with EOF=true // a write done with EOF=true
@ -897,7 +897,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close(); _in.close();
closed(); closed();
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
// Read until buffer full or EOF // Read until buffer full or EOF
@ -915,7 +915,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.position(0); _buffer.position(0);
_buffer.limit(len); _buffer.limit(len);
write(_buffer,_eof,this); write(_buffer,_eof,this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
@Override @Override
@ -958,7 +958,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
// Only return if EOF has previously been read and thus // Only return if EOF has previously been read and thus
// a write done with EOF=true // a write done with EOF=true
@ -967,7 +967,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close(); _in.close();
closed(); closed();
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
// Read from stream until buffer full or EOF // Read from stream until buffer full or EOF
@ -979,7 +979,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.flip(); _buffer.flip();
write(_buffer,_eof,this); write(_buffer,_eof,this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
@Override @Override

View File

@ -297,7 +297,7 @@ public class GzipHttpOutput extends HttpOutput
} }
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
if (_deflater.needsInput()) if (_deflater.needsInput())
{ {
@ -307,11 +307,11 @@ public class GzipHttpOutput extends HttpOutput
_deflater=null; _deflater=null;
getHttpChannel().getByteBufferPool().release(_buffer); getHttpChannel().getByteBufferPool().release(_buffer);
_buffer=null; _buffer=null;
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
if (!_complete) if (!_complete)
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
BufferUtil.compact(_buffer); BufferUtil.compact(_buffer);
@ -324,7 +324,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer(); addTrailer();
superWrite(_buffer,complete,this); superWrite(_buffer,complete,this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
} }
@ -342,7 +342,7 @@ public class GzipHttpOutput extends HttpOutput
} }
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
if (_deflater.needsInput()) if (_deflater.needsInput())
{ {
@ -354,11 +354,11 @@ public class GzipHttpOutput extends HttpOutput
_deflater=null; _deflater=null;
getHttpChannel().getByteBufferPool().release(_buffer); getHttpChannel().getByteBufferPool().release(_buffer);
_buffer=null; _buffer=null;
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
if (!_complete) if (!_complete)
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
else else
{ {
@ -389,7 +389,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer(); addTrailer();
superWrite(_buffer,complete,this); superWrite(_buffer,complete,this);
return State.SCHEDULED; return Next.SCHEDULED;
} }
} }

View File

@ -143,7 +143,7 @@ public class Flusher
private final Set<IStream> stalled = new HashSet<>(); private final Set<IStream> stalled = new HashSet<>();
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
synchronized (lock) synchronized (lock)
{ {
@ -194,7 +194,7 @@ public class Flusher
} }
if (active.size() == 0) if (active.size() == 0)
return State.IDLE; return Next.IDLE;
// Get the bytes to write // Get the bytes to write
ByteBuffer[] buffers = new ByteBuffer[active.size()]; ByteBuffer[] buffers = new ByteBuffer[active.size()];
@ -213,7 +213,7 @@ public class Flusher
// MAX_GATHER parameter, and/or autotune the buffer returned // MAX_GATHER parameter, and/or autotune the buffer returned
// by FrameBytes.getByteBuffer() (see also comment there). // by FrameBytes.getByteBuffer() (see also comment there).
return State.SCHEDULED; return Next.SCHEDULED;
} }
@Override @Override

View File

@ -44,7 +44,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public abstract class IteratingCallback implements Callback public abstract class IteratingCallback implements Callback
{ {
protected enum State { IDLE, SCHEDULED, ITERATING, SUCCEEDED, FAILED }; protected enum Next { IDLE, SCHEDULED, SUCCEEDED, FAILED };
protected enum State { IDLE, SCHEDULED, ITERATING, ITERATE_AGAIN, SUCCEEDED, FAILED };
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE); private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
public IteratingCallback() public IteratingCallback()
@ -66,7 +67,7 @@ public abstract class IteratingCallback implements Callback
* *
* @throws Exception * @throws Exception
*/ */
abstract protected State process() throws Exception; abstract protected Next process() throws Exception;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -81,42 +82,65 @@ public abstract class IteratingCallback implements Callback
{ {
try try
{ {
// Keep iterating as long as succeeded() is called during process() while(true)
// If we are in WAITING state, either this is the first iteration or
// succeeded()/failed() were called already.
while(_state.compareAndSet(State.IDLE,State.ITERATING))
{ {
State next = process(); switch (_state.get())
switch (next)
{ {
case SUCCEEDED:
// The task has complete, there should have been no callbacks
if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
throw new IllegalStateException("state="+_state.get());
completed();
return;
case SCHEDULED:
// This callback has been scheduled, so it may or may not have
// already been called back. Let's find out
if (_state.compareAndSet(State.ITERATING,State.SCHEDULED))
// not called back yet, so lets wait for it
return;
// call back must have happened, so lets iterate
continue;
case IDLE: case IDLE:
// No more progress can be made. Wait for another call to iterate // Keep iterating as long as succeeded() is called during process()
if (!_state.compareAndSet(State.ITERATING,State.IDLE)) // If we are in WAITING state, either this is the first iteration or
throw new IllegalStateException("state="+_state.get()); // succeeded()/failed() were called already.
return; while(_state.compareAndSet(State.IDLE,State.ITERATING))
{
case FAILED: Next next = process();
_state.set(State.FAILED); switch (next)
return; {
case SUCCEEDED:
// The task has complete, there should have been no callbacks
// Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE
if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED)&&
!_state.compareAndSet(State.ITERATE_AGAIN,State.SUCCEEDED))
throw new IllegalStateException("state="+_state.get());
completed();
return;
case SCHEDULED:
// This callback has been scheduled, so it may or may not have
// already been called back. Let's find out
// Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE
if (_state.compareAndSet(State.ITERATING,State.SCHEDULED) ||
_state.compareAndSet(State.ITERATE_AGAIN,State.SCHEDULED))
// not called back yet, so lets wait for it
return;
// call back must have happened, so lets iterate
continue;
case IDLE:
// No more progress can be made by this call to iterate
if (_state.compareAndSet(State.ITERATING,State.IDLE))
return;
// was iterate called again since we already decided to go IDLE?
if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE))
continue; // Try another iteration as more work may have been added while previous process was returning
throw new IllegalStateException("state="+_state.get());
case FAILED:
_state.set(State.FAILED);
return;
default:
throw new IllegalStateException("state="+_state.get()+" next="+next);
}
}
break;
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.ITERATE_AGAIN))
return;
break;
default: default:
throw new IllegalStateException("state="+_state.get()+" next="+next); return;
} }
} }
} }
@ -140,6 +164,11 @@ public abstract class IteratingCallback implements Callback
{ {
switch(_state.get()) switch(_state.get())
{ {
case ITERATE_AGAIN:
if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE))
break loop;
continue;
case ITERATING: case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.IDLE)) if (_state.compareAndSet(State.ITERATING,State.IDLE))
break loop; break loop;
@ -174,6 +203,11 @@ public abstract class IteratingCallback implements Callback
{ {
switch(_state.get()) switch(_state.get())
{ {
case ITERATE_AGAIN:
if (_state.compareAndSet(State.ITERATE_AGAIN,State.FAILED))
break loop;
continue;
case ITERATING: case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.FAILED)) if (_state.compareAndSet(State.ITERATING,State.FAILED))
break loop; break loop;

View File

@ -54,15 +54,15 @@ public class IteratingCallbackTest
int i=10; int i=10;
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-->1)
{ {
succeeded(); // fake a completed IO operation succeeded(); // fake a completed IO operation
return State.SCHEDULED; return Next.SCHEDULED;
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
}; };
@ -81,15 +81,15 @@ public class IteratingCallbackTest
int i=4; int i=4;
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-->1)
{ {
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return State.SCHEDULED; return Next.SCHEDULED;
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
}; };
@ -108,15 +108,15 @@ public class IteratingCallbackTest
int i=4; int i=4;
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-->1)
{ {
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
return State.SCHEDULED; return Next.SCHEDULED;
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
}; };
@ -145,7 +145,7 @@ public class IteratingCallbackTest
int i=10; int i=10;
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-->1)
@ -154,9 +154,9 @@ public class IteratingCallbackTest
succeeded(); // fake a completed IO operation succeeded(); // fake a completed IO operation
else else
failed(new Exception("testing")); failed(new Exception("testing"));
return State.SCHEDULED; return Next.SCHEDULED;
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
}; };
@ -173,15 +173,15 @@ public class IteratingCallbackTest
int i=4; int i=4;
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
processed++; processed++;
if (i-->1) if (i-->1)
{ {
scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS); scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS);
return State.SCHEDULED; return Next.SCHEDULED;
} }
return State.SUCCEEDED; return Next.SUCCEEDED;
} }
}; };
@ -202,7 +202,7 @@ public class IteratingCallbackTest
int i=5; int i=5;
@Override @Override
protected State process() protected Next process()
{ {
processed++; processed++;
@ -210,11 +210,11 @@ public class IteratingCallbackTest
{ {
case 5: case 5:
succeeded(); succeeded();
return State.SCHEDULED; return Next.SCHEDULED;
case 4: case 4:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return State.SCHEDULED; return Next.SCHEDULED;
case 3: case 3:
scheduler.schedule(new Runnable() scheduler.schedule(new Runnable()
@ -225,18 +225,18 @@ public class IteratingCallbackTest
idle.countDown(); idle.countDown();
} }
},5,TimeUnit.MILLISECONDS); },5,TimeUnit.MILLISECONDS);
return State.IDLE; return Next.IDLE;
case 2: case 2:
succeeded(); succeeded();
return State.SCHEDULED; return Next.SCHEDULED;
case 1: case 1:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
return State.SCHEDULED; return Next.SCHEDULED;
case 0: case 0:
return State.SUCCEEDED; return Next.SUCCEEDED;
default: default:
throw new IllegalStateException(); throw new IllegalStateException();

View File

@ -217,7 +217,7 @@ public class FrameFlusher
} }
@Override @Override
protected State process() throws Exception protected Next process() throws Exception
{ {
synchronized (lock) synchronized (lock)
{ {
@ -241,11 +241,11 @@ public class FrameFlusher
} }
if (buffers.size()==0) if (buffers.size()==0)
return State.IDLE; return Next.IDLE;
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()])); endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear(); buffers.clear();
return State.SCHEDULED; return Next.SCHEDULED;
} }
@Override @Override