diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 0b3c5a348dd..38edc2bd2af 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -153,31 +153,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du @Override public Runnable produce() - { - boolean looping = false; - while (true) - { - if (looping) - { - Runnable task = runActions(); - if (task != null) - return task; - - if (!select()) - return null; - } - - Runnable task = processSelected(); - if (task != null) - return task; - - update(); - - looping = true; - } - } - - public Runnable produce2() { while (true) { @@ -341,189 +316,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du { } -/* - @Override - public Runnable produce() - { - try - { - while (isRunning() || isStopping()) - { - if (!_selections.hasNext()) - { - // Do we have selected keys? - if (!_selectedKeys.isEmpty()) - { - // Yes, then update those keys. - for (SelectionKey key : _selectedKeys) - updateKey(key); - _selectedKeys.clear(); - } - - runChangesAndSetSelecting(); - - selectAndSetProcessing(); - } - - // Process any selected keys - while (_selections.hasNext()) - { - SelectionKey key = _selections.next(); - - if (key.isValid()) - { - Object attachment = key.attachment(); - try - { - if (attachment instanceof SelectableEndPoint) - { - // Try to produce a task - Runnable task = ((SelectableEndPoint)attachment).onSelected(); - if (task != null) - return task; - } - else if (key.isConnectable()) - { - processConnect(key, (Connect)attachment); - } - else if (key.isAcceptable()) - { - processAccept(key); - } - else - { - throw new IllegalStateException(); - } - } - catch (CancelledKeyException x) - { - LOG.debug("Ignoring cancelled key for channel {}", key.channel()); - if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); - } - catch (Throwable x) - { - LOG.warn("Could not process key for channel " + key.channel(), x); - if (attachment instanceof EndPoint) - closeNoExceptions((EndPoint)attachment); - } - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); - Object attachment = key.attachment(); - if (attachment instanceof EndPoint) - ((EndPoint)attachment).close(); - } - } - } - return null; - } - catch (Throwable x) - { - if (isRunning()) - LOG.warn(x); - else - LOG.ignore(x); - return null; - } - } - - private void runChangesAndSetSelecting() - { - // Run the changes, and only exit if we ran all changes - loop: - while (true) - { - State state = _state.get(); - switch (state) - { - case PROCESSING: - // We can loop on _runChanges list without lock, because only access here. - int size = _runChanges.size(); - for (int i = 0; i < size; i++) - runChange(_runChanges.get(i)); - _runChanges.clear(); - - if (!_state.compareAndSet(state, State.LOCKED)) - continue; - - // Do we have new changes? - if (_addChanges.isEmpty()) - { - // No, so lets go selecting. - _state.set(State.SELECTING); - break loop; - } - - // We have changes, so switch add/run lists and keep processing. - List tmp = _runChanges; - _runChanges = _addChanges; - _addChanges = tmp; - _state.set(State.PROCESSING); - continue; - - case LOCKED: - Thread.yield(); - continue; - - default: - throw new IllegalStateException(); - } - } - } - - private void selectAndSetProcessing() throws IOException - { - // Do the selecting! - if (LOG.isDebugEnabled()) - LOG.debug("Selector loop waiting on select"); - int selected = _selector.select(); - if (LOG.isDebugEnabled()) - LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); - - // We have finished selecting. This while loop could probably be replaced with just - // _state.compareAndSet(State.SELECTING, State.PROCESSING) - // since if state is locked by submit, the resulting state will be PROCESSING anyway. - // But let's be thorough and do the full loop. - out: - while (true) - { - switch (_state.get()) - { - case SELECTING: - // We were still in selecting state, so probably have - // selected a key, so goto processing state to handle. - if (_state.compareAndSet(State.SELECTING, State.PROCESSING)) - continue; - break out; - case PROCESSING: - // We were already in processing, so were woken up by a change being - // submitted, so no state change needed - lets just process. - break out; - case LOCKED: - // A change is currently being submitted. This does not matter - // here so much, but we will spin anyway so we don't race it later - // nor overwrite its state change. - Thread.yield(); - continue; - default: - throw new IllegalStateException(); - } - } - - _selectedKeys = _selector.selectedKeys(); - _selections = _selectedKeys.iterator(); - } - - private void updateKey(SelectionKey key) - { - Object attachment = key.attachment(); - if (attachment instanceof SelectableEndPoint) - ((SelectableEndPoint)attachment).updateKey(); - } -*/ private Runnable processConnect(SelectionKey key, final Connect connect) { SocketChannel channel = (SocketChannel)key.channel(); @@ -594,6 +386,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du _selectorManager.endPointOpened(endPoint); Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment()); endPoint.setConnection(connection); + selectionKey.attach(endPoint); _selectorManager.connectionOpened(connection); if (LOG.isDebugEnabled()) LOG.debug("Created {}", endPoint); @@ -758,8 +551,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Du { try { - EndPoint endpoint = createEndPoint(channel, key); - key.attach(endpoint); + createEndPoint(channel, key); } catch (Throwable x) { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java index 975af9f9c03..23c7a18679d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SpinLock.java @@ -22,7 +22,13 @@ import java.util.concurrent.atomic.AtomicBoolean; /* ------------------------------------------------------------ */ -/** +/** Spin Lock + *

This is a lock designed to protect VERY short sections of + * critical code. Threads attempting to take the lock will spin + * forever until the lock is available, thus it is important that + * the code protected by this lock is extremely simple and non + * blocking. The reason for this lock is that it prevents a thread + * from giving up a CPU core when contending for the lock.

*
  * try(SpinLock.Lock lock = spinlock.lock())
  * {
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceRun.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceRun.java
index 91227059760..5cff8f50ea5 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceRun.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceRun.java
@@ -88,7 +88,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
         }
 
         if (produce)
-            produce();
+            produceAndRun();
     }
     
     @Override
@@ -107,10 +107,10 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
         }
         
         if (produce)
-            produce();
+            produceAndRun();
     }
     
-    private void produce()
+    private void produceAndRun()
     {
         if (LOG.isDebugEnabled())
             LOG.debug("{} produce enter",this);