415062 SelectorManager wakeup optimisation
This commit is contained in:
parent
669f26566f
commit
edbf6e07aa
|
@ -39,6 +39,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.util.ConcurrentArrayQueue;
|
||||
import org.eclipse.jetty.util.TypeUtil;
|
||||
|
@ -58,6 +59,8 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
|
||||
{
|
||||
protected static final Logger LOG = Log.getLogger(SelectorManager.class);
|
||||
protected static final int SELECT_PERIOD=Integer.valueOf(System.getProperty("org.eclipse.jetty.io.SELECT_PERIOD","1000"));
|
||||
protected static final int WAKEUP_SPIN_PERIOD=Integer.valueOf(System.getProperty("org.eclipse.jetty.io.WAKEUP_SPIN_PERIOD","1"));
|
||||
/**
|
||||
* The default connect timeout, in milliseconds
|
||||
*/
|
||||
|
@ -309,6 +312,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
|
||||
}
|
||||
|
||||
|
||||
enum SelectorState { CHANGING, MORE_CHANGES, SELECTING, WAKING, PROCESSING };
|
||||
|
||||
/**
|
||||
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
|
||||
* <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
|
||||
|
@ -318,12 +324,12 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
|
||||
{
|
||||
private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
|
||||
|
||||
private final AtomicReference<SelectorState> _state = new AtomicReference<>(SelectorState.PROCESSING);
|
||||
private final int _id;
|
||||
private Selector _selector;
|
||||
private volatile Thread _thread;
|
||||
private boolean _needsWakeup = true;
|
||||
private boolean _runningChanges = false;
|
||||
private volatile int _sequence;
|
||||
private Thread _thread;
|
||||
|
||||
|
||||
public ManagedSelector(int id)
|
||||
{
|
||||
|
@ -362,7 +368,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
// If we are already iterating over the changes, just add this change to the list.
|
||||
// No race here because it is this thread that is iterating over the changes.
|
||||
if (_runningChanges)
|
||||
if (_state.get()==SelectorState.CHANGING)
|
||||
_changes.offer(change);
|
||||
else
|
||||
{
|
||||
|
@ -377,28 +383,61 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
// otherwise we have to queue the change and wakeup the selector
|
||||
_changes.offer(change);
|
||||
LOG.debug("Queued change {}", change);
|
||||
boolean wakeup = _needsWakeup;
|
||||
if (wakeup)
|
||||
wakeup();
|
||||
|
||||
// Should we wakeup?
|
||||
loop: while(true)
|
||||
{
|
||||
switch (_state.get())
|
||||
{
|
||||
case CHANGING:
|
||||
// We are still in changing state, so we can switch to MORE_CHANGES to ensure another
|
||||
// pass through the change list before going to SELECTING state.
|
||||
if (!_state.compareAndSet(SelectorState.CHANGING,SelectorState.MORE_CHANGES))
|
||||
continue;
|
||||
break loop;
|
||||
|
||||
case SELECTING:
|
||||
// If we are SELECTING, goto WAKING state so only one caller will spin on wakeup.
|
||||
if (!_state.compareAndSet(SelectorState.SELECTING,SelectorState.WAKING))
|
||||
continue;
|
||||
|
||||
// Spin doing wakeups until we see the select has moved to the next sequence.
|
||||
// This spin handles the race of doing a wakeup just before a select call.
|
||||
final long sequence=_sequence;
|
||||
try
|
||||
{
|
||||
do
|
||||
{
|
||||
wakeup();
|
||||
|
||||
// We don't want to spin too fast as wakeup is not cheap, but sleeping might sleep for a long
|
||||
// time, so this might need to be profiled and tuned?
|
||||
if (WAKEUP_SPIN_PERIOD==0)
|
||||
Thread.yield();
|
||||
else
|
||||
Thread.sleep(WAKEUP_SPIN_PERIOD);
|
||||
}
|
||||
while(sequence==_sequence);
|
||||
}
|
||||
catch(InterruptedException e)
|
||||
{
|
||||
LOG.ignore(e);
|
||||
}
|
||||
break loop;
|
||||
|
||||
default:
|
||||
// We must be WAKING or PROCESSING, so change will be run in due course without a wakeup
|
||||
break loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void runChanges()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_runningChanges)
|
||||
throw new IllegalStateException();
|
||||
_runningChanges=true;
|
||||
|
||||
Runnable change;
|
||||
while ((change = _changes.poll()) != null)
|
||||
runChange(change);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_runningChanges=false;
|
||||
}
|
||||
Runnable change;
|
||||
while ((change = _changes.poll()) != null)
|
||||
runChange(change);
|
||||
}
|
||||
|
||||
protected void runChange(Runnable change)
|
||||
|
@ -411,6 +450,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
public void run()
|
||||
{
|
||||
_thread = Thread.currentThread();
|
||||
_sequence++; // volatile increment for memory barrier
|
||||
String name = _thread.getName();
|
||||
try
|
||||
{
|
||||
|
@ -418,12 +458,14 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
LOG.debug("Starting {} on {}", _thread, this);
|
||||
while (isRunning())
|
||||
select();
|
||||
processChanges();
|
||||
runChanges();
|
||||
}
|
||||
finally
|
||||
{
|
||||
LOG.debug("Stopped {} on {}", _thread, this);
|
||||
_thread.setName(name);
|
||||
_thread=null;
|
||||
_sequence++; // volatile increment for memory barrier
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -437,16 +479,49 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
boolean debug = LOG.isDebugEnabled();
|
||||
try
|
||||
{
|
||||
processChanges();
|
||||
|
||||
// handle changes
|
||||
if (!_state.compareAndSet(SelectorState.PROCESSING,SelectorState.CHANGING))
|
||||
throw new IllegalStateException();
|
||||
change_loop: while (true)
|
||||
{
|
||||
switch (_state.get())
|
||||
{
|
||||
case CHANGING:
|
||||
// We are still in CHANGING, so run the changes.
|
||||
runChanges();
|
||||
// If we can switch to SELECTING break the loop
|
||||
if (_state.compareAndSet(SelectorState.CHANGING,SelectorState.SELECTING))
|
||||
break change_loop;
|
||||
// otherwise loop
|
||||
continue;
|
||||
|
||||
case MORE_CHANGES:
|
||||
// If we are MORE_CHANGES, then more were added while we were running, so
|
||||
// switch back to CHANGING and run again.
|
||||
if (!_state.compareAndSet(SelectorState.MORE_CHANGES,SelectorState.CHANGING))
|
||||
throw new IllegalStateException();
|
||||
continue;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
// If we got here, we must have switched to SELECTING state, so let's do it!
|
||||
if (debug)
|
||||
LOG.debug("Selector loop waiting on select");
|
||||
int selected = _selector.select();
|
||||
int selected = _selector.select(SELECT_PERIOD);
|
||||
|
||||
// increment the sequence number to end any spinning wakeups
|
||||
_sequence++;
|
||||
|
||||
// we are now definitely switching to PROCESSING state
|
||||
_state.set(SelectorState.PROCESSING);
|
||||
|
||||
if (debug)
|
||||
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
|
||||
|
||||
_needsWakeup = false;
|
||||
|
||||
// Process Keys
|
||||
Set<SelectionKey> selectedKeys = _selector.selectedKeys();
|
||||
for (SelectionKey key : selectedKeys)
|
||||
{
|
||||
|
@ -474,20 +549,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
}
|
||||
}
|
||||
|
||||
private void processChanges()
|
||||
{
|
||||
runChanges();
|
||||
|
||||
// If tasks are submitted between these 2 statements, they will not
|
||||
// wakeup the selector, therefore below we run again the tasks
|
||||
|
||||
_needsWakeup = true;
|
||||
|
||||
// Run again the tasks to avoid the race condition where a task is
|
||||
// submitted but will not wake up the selector
|
||||
runChanges();
|
||||
}
|
||||
|
||||
private void processKey(SelectionKey key)
|
||||
{
|
||||
Object attachment = key.attachment();
|
||||
|
|
|
@ -228,7 +228,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
if (_parser.isIdle())
|
||||
if (!_connector.isRunning())
|
||||
LOG.ignore(e);
|
||||
else if (_parser.isIdle())
|
||||
LOG.debug(e);
|
||||
else
|
||||
LOG.warn(this.toString(), e);
|
||||
|
|
Loading…
Reference in New Issue