460211 Fixed Idle race in ExecuteProduceRun

Fixed new race in refactored SelectorManager between attaching to key and calling onOpen
This commit is contained in:
Greg Wilkins 2015-02-19 10:52:35 +11:00
parent cc44952cc8
commit e541865cef
3 changed files with 12 additions and 214 deletions

View File

@ -153,31 +153,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
@Override
public Runnable produce()
{
boolean looping = false;
while (true)
{
if (looping)
{
Runnable task = runActions();
if (task != null)
return task;
if (!select())
return null;
}
Runnable task = processSelected();
if (task != null)
return task;
update();
looping = true;
}
}
public Runnable produce2()
{
while (true)
{
@ -341,189 +316,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
{
}
/*
@Override
public Runnable produce()
{
try
{
while (isRunning() || isStopping())
{
if (!_selections.hasNext())
{
// Do we have selected keys?
if (!_selectedKeys.isEmpty())
{
// Yes, then update those keys.
for (SelectionKey key : _selectedKeys)
updateKey(key);
_selectedKeys.clear();
}
runChangesAndSetSelecting();
selectAndSetProcessing();
}
// Process any selected keys
while (_selections.hasNext())
{
SelectionKey key = _selections.next();
if (key.isValid())
{
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableEndPoint)
{
// Try to produce a task
Runnable task = ((SelectableEndPoint)attachment).onSelected();
if (task != null)
return task;
}
else if (key.isConnectable())
{
processConnect(key, (Connect)attachment);
}
else if (key.isAcceptable())
{
processAccept(key);
}
else
{
throw new IllegalStateException();
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
((EndPoint)attachment).close();
}
}
}
return null;
}
catch (Throwable x)
{
if (isRunning())
LOG.warn(x);
else
LOG.ignore(x);
return null;
}
}
private void runChangesAndSetSelecting()
{
// Run the changes, and only exit if we ran all changes
loop:
while (true)
{
State state = _state.get();
switch (state)
{
case PROCESSING:
// We can loop on _runChanges list without lock, because only access here.
int size = _runChanges.size();
for (int i = 0; i < size; i++)
runChange(_runChanges.get(i));
_runChanges.clear();
if (!_state.compareAndSet(state, State.LOCKED))
continue;
// Do we have new changes?
if (_addChanges.isEmpty())
{
// No, so lets go selecting.
_state.set(State.SELECTING);
break loop;
}
// We have changes, so switch add/run lists and keep processing.
List<Runnable> tmp = _runChanges;
_runChanges = _addChanges;
_addChanges = tmp;
_state.set(State.PROCESSING);
continue;
case LOCKED:
Thread.yield();
continue;
default:
throw new IllegalStateException();
}
}
}
private void selectAndSetProcessing() throws IOException
{
// Do the selecting!
if (LOG.isDebugEnabled())
LOG.debug("Selector loop waiting on select");
int selected = _selector.select();
if (LOG.isDebugEnabled())
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
// We have finished selecting. This while loop could probably be replaced with just
// _state.compareAndSet(State.SELECTING, State.PROCESSING)
// since if state is locked by submit, the resulting state will be PROCESSING anyway.
// But let's be thorough and do the full loop.
out:
while (true)
{
switch (_state.get())
{
case SELECTING:
// We were still in selecting state, so probably have
// selected a key, so goto processing state to handle.
if (_state.compareAndSet(State.SELECTING, State.PROCESSING))
continue;
break out;
case PROCESSING:
// We were already in processing, so were woken up by a change being
// submitted, so no state change needed - lets just process.
break out;
case LOCKED:
// A change is currently being submitted. This does not matter
// here so much, but we will spin anyway so we don't race it later
// nor overwrite its state change.
Thread.yield();
continue;
default:
throw new IllegalStateException();
}
}
_selectedKeys = _selector.selectedKeys();
_selections = _selectedKeys.iterator();
}
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
if (attachment instanceof SelectableEndPoint)
((SelectableEndPoint)attachment).updateKey();
}
*/
private Runnable processConnect(SelectionKey key, final Connect connect)
{
SocketChannel channel = (SocketChannel)key.channel();
@ -594,6 +386,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
_selectorManager.endPointOpened(endPoint);
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setConnection(connection);
selectionKey.attach(endPoint);
_selectorManager.connectionOpened(connection);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", endPoint);
@ -758,8 +551,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du
{
try
{
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
createEndPoint(channel, key);
}
catch (Throwable x)
{

View File

@ -22,7 +22,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
/* ------------------------------------------------------------ */
/**
/** Spin Lock
* <p>This is a lock designed to protect VERY short sections of
* critical code. Threads attempting to take the lock will spin
* forever until the lock is available, thus it is important that
* the code protected by this lock is extremely simple and non
* blocking. The reason for this lock is that it prevents a thread
* from giving up a CPU core when contending for the lock.</p>
* <pre>
* try(SpinLock.Lock lock = spinlock.lock())
* {

View File

@ -88,7 +88,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
}
if (produce)
produce();
produceAndRun();
}
@Override
@ -107,10 +107,10 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
}
if (produce)
produce();
produceAndRun();
}
private void produce()
private void produceAndRun()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce enter",this);