diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 7a8d4ac23dd..494dbb2ea87 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -22,6 +22,7 @@ import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.io.SelectorManager.ManagedSelector; import org.eclipse.jetty.util.log.Log; @@ -45,7 +46,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa if (getChannel().isOpen()) { int oldInterestOps = _key.interestOps(); - int newInterestOps = _interestOps; + int newInterestOps = _interestOps.get(); if (newInterestOps != oldInterestOps) setKeyInterests(oldInterestOps, newInterestOps); } @@ -72,7 +73,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa /** * The desired value for {@link SelectionKey#interestOps()} */ - private volatile int _interestOps; + private final AtomicInteger _interestOps = new AtomicInteger(); public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) { @@ -105,6 +106,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa @Override public void onSelected() { + assert _selector.isSelectorThread(); int oldInterestOps = _key.interestOps(); int readyOps = _key.readyOps(); int newInterestOps = oldInterestOps & ~readyOps; @@ -119,33 +121,45 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa private void updateLocalInterests(int operation, boolean add) { - int oldInterestOps = _interestOps; - int newInterestOps; - if (add) - newInterestOps = oldInterestOps | operation; - else - newInterestOps = oldInterestOps & ~operation; - - if (isInputShutdown()) - newInterestOps &= ~SelectionKey.OP_READ; - if (isOutputShutdown()) - newInterestOps &= ~SelectionKey.OP_WRITE; - - if (newInterestOps != oldInterestOps) + while (true) { - _interestOps = newInterestOps; - LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this); - _selector.submit(_updateTask); - } - else - { - LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this); + int oldInterestOps = _interestOps.get(); + int newInterestOps; + if (add) + newInterestOps = oldInterestOps | operation; + else + newInterestOps = oldInterestOps & ~operation; + + if (isInputShutdown()) + newInterestOps &= ~SelectionKey.OP_READ; + if (isOutputShutdown()) + newInterestOps &= ~SelectionKey.OP_WRITE; + + if (newInterestOps != oldInterestOps) + { + if (_interestOps.compareAndSet(oldInterestOps, newInterestOps)) + { + LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this); + _selector.submit(_updateTask); + } + else + { + LOG.debug("Local interests update conflict: now {}, was {}, attempted {}", _interestOps.get(), oldInterestOps, newInterestOps); + continue; + } + } + else + { + LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this); + } + break; } } private void setKeyInterests(int oldInterestOps, int newInterestOps) { + assert _selector.isSelectorThread(); LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps); _key.interestOps(newInterestOps); } @@ -185,18 +199,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa // Do NOT use synchronized (this) // because it's very easy to deadlock when debugging is enabled. // We do a best effort to print the right toString() and that's it. - String keyString = ""; - if (_key.isValid()) + try { - if (_key.isReadable()) - keyString += "r"; - if (_key.isWritable()) - keyString += "w"; + boolean valid = _key.isValid(); + int keyInterests = valid ? _key.interestOps() : -1; + int keyReadiness = valid ? _key.readyOps() : -1; + return String.format("%s{io=%d,kio=%d,kro=%d}", + super.toString(), + _interestOps.get(), + keyInterests, + keyReadiness); } - else + catch (CancelledKeyException x) { - keyString += "!"; + return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get()); } - return String.format("%s{io=%d,k=%s}",super.toString(), _interestOps, keyString); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 2463a0c0afc..1aab8ba7698 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -516,6 +516,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _selector.wakeup(); } + public boolean isSelectorThread() + { + return Thread.currentThread() == _thread; + } + private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException { EndPoint endPoint = newEndPoint(channel, this, selectionKey);