From edbf6e07aabb1bae2180cb3f8640482392526fe2 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 16 Aug 2013 16:35:38 +1000 Subject: [PATCH] 415062 SelectorManager wakeup optimisation --- .../org/eclipse/jetty/io/SelectorManager.java | 145 +++++++++++++----- .../eclipse/jetty/server/HttpConnection.java | 4 +- 2 files changed, 106 insertions(+), 43 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index f8b13277259..b55492ef35f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.TypeUtil; @@ -58,6 +59,8 @@ import org.eclipse.jetty.util.thread.Scheduler; public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { protected static final Logger LOG = Log.getLogger(SelectorManager.class); + protected static final int SELECT_PERIOD=Integer.valueOf(System.getProperty("org.eclipse.jetty.io.SELECT_PERIOD","1000")); + protected static final int WAKEUP_SPIN_PERIOD=Integer.valueOf(System.getProperty("org.eclipse.jetty.io.WAKEUP_SPIN_PERIOD","1")); /** * The default connect timeout, in milliseconds */ @@ -309,6 +312,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors)); } + + enum SelectorState { CHANGING, MORE_CHANGES, SELECTING, WAKING, PROCESSING }; + /** *

{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.

*

{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events @@ -318,12 +324,12 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable { private final Queue _changes = new ConcurrentArrayQueue<>(); - + private final AtomicReference _state = new AtomicReference<>(SelectorState.PROCESSING); private final int _id; private Selector _selector; - private volatile Thread _thread; - private boolean _needsWakeup = true; - private boolean _runningChanges = false; + private volatile int _sequence; + private Thread _thread; + public ManagedSelector(int id) { @@ -362,7 +368,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { // If we are already iterating over the changes, just add this change to the list. // No race here because it is this thread that is iterating over the changes. - if (_runningChanges) + if (_state.get()==SelectorState.CHANGING) _changes.offer(change); else { @@ -377,28 +383,61 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa // otherwise we have to queue the change and wakeup the selector _changes.offer(change); LOG.debug("Queued change {}", change); - boolean wakeup = _needsWakeup; - if (wakeup) - wakeup(); + + // Should we wakeup? + loop: while(true) + { + switch (_state.get()) + { + case CHANGING: + // We are still in changing state, so we can switch to MORE_CHANGES to ensure another + // pass through the change list before going to SELECTING state. + if (!_state.compareAndSet(SelectorState.CHANGING,SelectorState.MORE_CHANGES)) + continue; + break loop; + + case SELECTING: + // If we are SELECTING, goto WAKING state so only one caller will spin on wakeup. + if (!_state.compareAndSet(SelectorState.SELECTING,SelectorState.WAKING)) + continue; + + // Spin doing wakeups until we see the select has moved to the next sequence. + // This spin handles the race of doing a wakeup just before a select call. + final long sequence=_sequence; + try + { + do + { + wakeup(); + + // We don't want to spin too fast as wakeup is not cheap, but sleeping might sleep for a long + // time, so this might need to be profiled and tuned? + if (WAKEUP_SPIN_PERIOD==0) + Thread.yield(); + else + Thread.sleep(WAKEUP_SPIN_PERIOD); + } + while(sequence==_sequence); + } + catch(InterruptedException e) + { + LOG.ignore(e); + } + break loop; + + default: + // We must be WAKING or PROCESSING, so change will be run in due course without a wakeup + break loop; + } + } } } private void runChanges() { - try - { - if (_runningChanges) - throw new IllegalStateException(); - _runningChanges=true; - - Runnable change; - while ((change = _changes.poll()) != null) - runChange(change); - } - finally - { - _runningChanges=false; - } + Runnable change; + while ((change = _changes.poll()) != null) + runChange(change); } protected void runChange(Runnable change) @@ -411,6 +450,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa public void run() { _thread = Thread.currentThread(); + _sequence++; // volatile increment for memory barrier String name = _thread.getName(); try { @@ -418,12 +458,14 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa LOG.debug("Starting {} on {}", _thread, this); while (isRunning()) select(); - processChanges(); + runChanges(); } finally { LOG.debug("Stopped {} on {}", _thread, this); _thread.setName(name); + _thread=null; + _sequence++; // volatile increment for memory barrier } } @@ -437,16 +479,49 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa boolean debug = LOG.isDebugEnabled(); try { - processChanges(); - + // handle changes + if (!_state.compareAndSet(SelectorState.PROCESSING,SelectorState.CHANGING)) + throw new IllegalStateException(); + change_loop: while (true) + { + switch (_state.get()) + { + case CHANGING: + // We are still in CHANGING, so run the changes. + runChanges(); + // If we can switch to SELECTING break the loop + if (_state.compareAndSet(SelectorState.CHANGING,SelectorState.SELECTING)) + break change_loop; + // otherwise loop + continue; + + case MORE_CHANGES: + // If we are MORE_CHANGES, then more were added while we were running, so + // switch back to CHANGING and run again. + if (!_state.compareAndSet(SelectorState.MORE_CHANGES,SelectorState.CHANGING)) + throw new IllegalStateException(); + continue; + + default: + throw new IllegalStateException(); + } + } + + // If we got here, we must have switched to SELECTING state, so let's do it! if (debug) LOG.debug("Selector loop waiting on select"); - int selected = _selector.select(); + int selected = _selector.select(SELECT_PERIOD); + + // increment the sequence number to end any spinning wakeups + _sequence++; + + // we are now definitely switching to PROCESSING state + _state.set(SelectorState.PROCESSING); + if (debug) LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); - _needsWakeup = false; - + // Process Keys Set selectedKeys = _selector.selectedKeys(); for (SelectionKey key : selectedKeys) { @@ -474,20 +549,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } } - private void processChanges() - { - runChanges(); - - // If tasks are submitted between these 2 statements, they will not - // wakeup the selector, therefore below we run again the tasks - - _needsWakeup = true; - - // Run again the tasks to avoid the race condition where a task is - // submitted but will not wake up the selector - runChanges(); - } - private void processKey(SelectionKey key) { Object attachment = key.attachment(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index faca50e73c5..aa5ff4891e0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -228,7 +228,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } catch (Exception e) { - if (_parser.isIdle()) + if (!_connector.isRunning()) + LOG.ignore(e); + else if (_parser.isIdle()) LOG.debug(e); else LOG.warn(this.toString(), e);