jetty-9: Fixed concurrent updates to volatile variable _interestOps.

Read interest and write interest can be setting concurrently, and they may cancel each other.
Replaced _interestOps with an AtomicInteger and checking whether the update succeeds,
otherwise it is reattempted.
This commit is contained in:
Simone Bordet 2012-10-12 15:58:52 +02:00
parent 7e30c4ac20
commit 39fb81c486
2 changed files with 52 additions and 31 deletions

View File

@ -22,6 +22,7 @@ import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector; import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -45,7 +46,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
if (getChannel().isOpen()) if (getChannel().isOpen())
{ {
int oldInterestOps = _key.interestOps(); int oldInterestOps = _key.interestOps();
int newInterestOps = _interestOps; int newInterestOps = _interestOps.get();
if (newInterestOps != oldInterestOps) if (newInterestOps != oldInterestOps)
setKeyInterests(oldInterestOps, newInterestOps); setKeyInterests(oldInterestOps, newInterestOps);
} }
@ -72,7 +73,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
/** /**
* The desired value for {@link SelectionKey#interestOps()} * The desired value for {@link SelectionKey#interestOps()}
*/ */
private volatile int _interestOps; private final AtomicInteger _interestOps = new AtomicInteger();
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{ {
@ -105,6 +106,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
@Override @Override
public void onSelected() public void onSelected()
{ {
assert _selector.isSelectorThread();
int oldInterestOps = _key.interestOps(); int oldInterestOps = _key.interestOps();
int readyOps = _key.readyOps(); int readyOps = _key.readyOps();
int newInterestOps = oldInterestOps & ~readyOps; int newInterestOps = oldInterestOps & ~readyOps;
@ -119,33 +121,45 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private void updateLocalInterests(int operation, boolean add) private void updateLocalInterests(int operation, boolean add)
{ {
int oldInterestOps = _interestOps; while (true)
int newInterestOps;
if (add)
newInterestOps = oldInterestOps | operation;
else
newInterestOps = oldInterestOps & ~operation;
if (isInputShutdown())
newInterestOps &= ~SelectionKey.OP_READ;
if (isOutputShutdown())
newInterestOps &= ~SelectionKey.OP_WRITE;
if (newInterestOps != oldInterestOps)
{ {
_interestOps = newInterestOps; int oldInterestOps = _interestOps.get();
LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this); int newInterestOps;
_selector.submit(_updateTask); if (add)
} newInterestOps = oldInterestOps | operation;
else else
{ newInterestOps = oldInterestOps & ~operation;
LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
if (isInputShutdown())
newInterestOps &= ~SelectionKey.OP_READ;
if (isOutputShutdown())
newInterestOps &= ~SelectionKey.OP_WRITE;
if (newInterestOps != oldInterestOps)
{
if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
{
LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
_selector.submit(_updateTask);
}
else
{
LOG.debug("Local interests update conflict: now {}, was {}, attempted {}", _interestOps.get(), oldInterestOps, newInterestOps);
continue;
}
}
else
{
LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
}
break;
} }
} }
private void setKeyInterests(int oldInterestOps, int newInterestOps) private void setKeyInterests(int oldInterestOps, int newInterestOps)
{ {
assert _selector.isSelectorThread();
LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps); LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
_key.interestOps(newInterestOps); _key.interestOps(newInterestOps);
} }
@ -185,18 +199,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
// Do NOT use synchronized (this) // Do NOT use synchronized (this)
// because it's very easy to deadlock when debugging is enabled. // because it's very easy to deadlock when debugging is enabled.
// We do a best effort to print the right toString() and that's it. // We do a best effort to print the right toString() and that's it.
String keyString = ""; try
if (_key.isValid())
{ {
if (_key.isReadable()) boolean valid = _key.isValid();
keyString += "r"; int keyInterests = valid ? _key.interestOps() : -1;
if (_key.isWritable()) int keyReadiness = valid ? _key.readyOps() : -1;
keyString += "w"; return String.format("%s{io=%d,kio=%d,kro=%d}",
super.toString(),
_interestOps.get(),
keyInterests,
keyReadiness);
} }
else catch (CancelledKeyException x)
{ {
keyString += "!"; return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
} }
return String.format("%s{io=%d,k=%s}",super.toString(), _interestOps, keyString);
} }
} }

View File

@ -516,6 +516,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_selector.wakeup(); _selector.wakeup();
} }
public boolean isSelectorThread()
{
return Thread.currentThread() == _thread;
}
private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
{ {
EndPoint endPoint = newEndPoint(channel, this, selectionKey); EndPoint endPoint = newEndPoint(channel, this, selectionKey);