diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index ae6969f1860..43ac4de85f0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -392,7 +393,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private enum State { - CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS + CHANGING, SELECTING, PROCESSING, LOCKED } /** @@ -403,8 +404,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa */ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable { - private final AtomicReference _state= new AtomicReference<>(State.PROCESS); - private final Queue _changes = new ConcurrentArrayQueue<>(); + private final AtomicReference _state= new AtomicReference<>(State.PROCESSING); + private List _runChanges = new ArrayList<>(); + private List _addChanges = new ArrayList<>(); private final int _id; private Selector _selector; private volatile Thread _thread; @@ -420,7 +422,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { super.doStart(); _selector = newSelector(); - _state.set(State.PROCESS); + _state.set(State.PROCESSING); } protected Selector newSelector() throws IOException @@ -447,57 +449,49 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * (if necessary) to execute the change.

* * @param change the change to submit - * @return true if the selector was woken up, false otherwise */ - public boolean submit(Runnable change) + public void submit(Runnable change) { // This method may be called from the selector thread, and therefore // we could directly run the change without queueing, but this may // lead to stack overflows on a busy server, so we always offer the // change to the queue and process the state. - _changes.offer(change); if (LOG.isDebugEnabled()) LOG.debug("Queued change {}", change); out: while (true) { - switch (_state.get()) + State state=_state.get(); + switch (state) { - case SELECT: - // Avoid multiple wakeup() calls if we the CAS fails - if (!_state.compareAndSet(State.SELECT, State.WAKEUP)) + case PROCESSING: + if (!_state.compareAndSet(State.PROCESSING, State.LOCKED)) continue; + _addChanges.add(change); + _state.set(State.PROCESSING); + break out; + + case CHANGING: + if (!_state.compareAndSet(State.CHANGING, State.LOCKED)) + continue; + _addChanges.add(change); + _state.set(State.CHANGING); + break out; + + case SELECTING: + if (!_state.compareAndSet(State.SELECTING, State.LOCKED)) + continue; + _addChanges.add(change); _selector.wakeup(); - return true; - case CHANGES: - // Tell the selector thread that we have more changes. - // If we fail to CAS, we possibly need to wakeup(), so loop. - if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES)) - break out; + _state.set(State.PROCESSING); + break out; + + case LOCKED: + Thread.yield(); continue; - case WAKEUP: - // Do nothing, we have already a wakeup scheduled - break out; - case MORE_CHANGES: - // Do nothing, we already notified the selector thread of more changes - break out; - case PROCESS: - // Do nothing, the changes will be run after the processing - break out; - default: - throw new IllegalStateException(); } } - - return false; - } - - private void runChanges() - { - Runnable change; - while ((change = _changes.poll()) != null) - runChange(change); } protected void runChange(Runnable change) @@ -531,7 +525,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa while (isRunning()) select(); while (isStopping()) - runChanges(); + select(); } finally { @@ -553,38 +547,73 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa boolean debug = LOG.isDebugEnabled(); try { - _state.set(State.CHANGES); + + // Run the changes, and only exit if we ran all changes + loop: while(true) + { + State state=_state.get(); + switch (state) + { + case CHANGING: + case PROCESSING: + int size = _runChanges.size(); + for (int i=0;i tmp=_runChanges; + _runChanges=_addChanges; + _addChanges=tmp; + _state.set(State.CHANGING); + continue; + + case LOCKED: + Thread.yield(); + continue; + + default: + throw new IllegalStateException(); + } + } + + // Do the selecting! + int selected; + if (debug) + { + LOG.debug("Selector loop waiting on select"); + selected = _selector.select(); + LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); + } + else + selected = _selector.select(); // Run the changes, and only exit if we ran all changes out: while(true) { switch (_state.get()) { - case CHANGES: - runChanges(); - if (_state.compareAndSet(State.CHANGES, State.SELECT)) - break out; + case SELECTING: + if (_state.compareAndSet(State.SELECTING, State.PROCESSING)) + continue; + break out; + case PROCESSING: + break out; + case LOCKED: + Thread.yield(); continue; - case MORE_CHANGES: - runChanges(); - _state.set(State.CHANGES); - continue; - default: - throw new IllegalStateException(); + case CHANGING: + throw new IllegalStateException(); } } - // Must check first for SELECT and *then* for WAKEUP - // because we read the state twice in the assert, and - // it could change from SELECT to WAKEUP in between. - assert _state.get() == State.SELECT || _state.get() == State.WAKEUP; - - if (debug) - LOG.debug("Selector loop waiting on select"); - int selected = _selector.select(); - if (debug) - LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); - - _state.set(State.PROCESS); Set selectedKeys = _selector.selectedKeys(); for (SelectionKey key : selectedKeys) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 9be3b5b62c1..1478a6da187 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -231,6 +231,8 @@ public class ServerConnector extends AbstractNetworkConnector _manager = newSelectorManager(getExecutor(), getScheduler(), selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2))); addBean(_manager, true); + setSelectorPriorityDelta(-1); + setAcceptorPriorityDelta(-2); } protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)