Simplified SelectorManager state machine

Made the SelectorManager use the CaS state machine for both locking and controlling
the mode of handling changes.

Replaced the concurrent change queue with a pair of array lists that are switched while the lock state is held
This commit is contained in:
Greg Wilkins 2014-10-17 14:16:48 +11:00
parent e8a843dc6a
commit a906aaa266
2 changed files with 92 additions and 61 deletions

View File

@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -392,7 +393,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private enum State
{
CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
CHANGING, SELECTING, PROCESSING, LOCKED
}
/**
@ -403,8 +404,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{
private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESSING);
private List<Runnable> _runChanges = new ArrayList<>();
private List<Runnable> _addChanges = new ArrayList<>();
private final int _id;
private Selector _selector;
private volatile Thread _thread;
@ -420,7 +422,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
super.doStart();
_selector = newSelector();
_state.set(State.PROCESS);
_state.set(State.PROCESSING);
}
protected Selector newSelector() throws IOException
@ -447,57 +449,49 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* (if necessary) to execute the change.</p>
*
* @param change the change to submit
* @return true if the selector was woken up, false otherwise
*/
public boolean submit(Runnable change)
public void submit(Runnable change)
{
// This method may be called from the selector thread, and therefore
// we could directly run the change without queueing, but this may
// lead to stack overflows on a busy server, so we always offer the
// change to the queue and process the state.
_changes.offer(change);
if (LOG.isDebugEnabled())
LOG.debug("Queued change {}", change);
out: while (true)
{
switch (_state.get())
State state=_state.get();
switch (state)
{
case SELECT:
// Avoid multiple wakeup() calls if we the CAS fails
if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
case PROCESSING:
if (!_state.compareAndSet(State.PROCESSING, State.LOCKED))
continue;
_addChanges.add(change);
_state.set(State.PROCESSING);
break out;
case CHANGING:
if (!_state.compareAndSet(State.CHANGING, State.LOCKED))
continue;
_addChanges.add(change);
_state.set(State.CHANGING);
break out;
case SELECTING:
if (!_state.compareAndSet(State.SELECTING, State.LOCKED))
continue;
_addChanges.add(change);
_selector.wakeup();
return true;
case CHANGES:
// Tell the selector thread that we have more changes.
// If we fail to CAS, we possibly need to wakeup(), so loop.
if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
break out;
_state.set(State.PROCESSING);
break out;
case LOCKED:
Thread.yield();
continue;
case WAKEUP:
// Do nothing, we have already a wakeup scheduled
break out;
case MORE_CHANGES:
// Do nothing, we already notified the selector thread of more changes
break out;
case PROCESS:
// Do nothing, the changes will be run after the processing
break out;
default:
throw new IllegalStateException();
}
}
return false;
}
private void runChanges()
{
Runnable change;
while ((change = _changes.poll()) != null)
runChange(change);
}
protected void runChange(Runnable change)
@ -531,7 +525,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
while (isRunning())
select();
while (isStopping())
runChanges();
select();
}
finally
{
@ -553,38 +547,73 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
boolean debug = LOG.isDebugEnabled();
try
{
_state.set(State.CHANGES);
// Run the changes, and only exit if we ran all changes
loop: while(true)
{
State state=_state.get();
switch (state)
{
case CHANGING:
case PROCESSING:
int size = _runChanges.size();
for (int i=0;i<size;i++)
runChange(_runChanges.get(i));
_runChanges.clear();
if (!_state.compareAndSet(state, State.LOCKED))
continue;
if (_addChanges.isEmpty())
{
_state.set(State.SELECTING);
break loop;
}
List<Runnable> tmp=_runChanges;
_runChanges=_addChanges;
_addChanges=tmp;
_state.set(State.CHANGING);
continue;
case LOCKED:
Thread.yield();
continue;
default:
throw new IllegalStateException();
}
}
// Do the selecting!
int selected;
if (debug)
{
LOG.debug("Selector loop waiting on select");
selected = _selector.select();
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
}
else
selected = _selector.select();
// Run the changes, and only exit if we ran all changes
out: while(true)
{
switch (_state.get())
{
case CHANGES:
runChanges();
if (_state.compareAndSet(State.CHANGES, State.SELECT))
break out;
case SELECTING:
if (_state.compareAndSet(State.SELECTING, State.PROCESSING))
continue;
break out;
case PROCESSING:
break out;
case LOCKED:
Thread.yield();
continue;
case MORE_CHANGES:
runChanges();
_state.set(State.CHANGES);
continue;
default:
throw new IllegalStateException();
case CHANGING:
throw new IllegalStateException();
}
}
// Must check first for SELECT and *then* for WAKEUP
// because we read the state twice in the assert, and
// it could change from SELECT to WAKEUP in between.
assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
if (debug)
LOG.debug("Selector loop waiting on select");
int selected = _selector.select();
if (debug)
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
_state.set(State.PROCESS);
Set<SelectionKey> selectedKeys = _selector.selectedKeys();
for (SelectionKey key : selectedKeys)

View File

@ -231,6 +231,8 @@ public class ServerConnector extends AbstractNetworkConnector
_manager = newSelectorManager(getExecutor(), getScheduler(),
selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2)));
addBean(_manager, true);
setSelectorPriorityDelta(-1);
setAcceptorPriorityDelta(-2);
}
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)