Fixed race in updating the key interests.
It was possible that updateKey() was seeing a SELECTING state and therefore attempt to call setKeyInterests(), while changeInterests() was also seeing the SELECTING state, then moving to CHANGING so that _interestOps was accessed concurrently. Also made the update task to call updateKey() instead of calling directly setKeyInterests(), in order to comply with the state machine; this required to have onSelected() handle additional states that are created by updateKey(). Finally, in updateKey() now setKeyInterests() is called before updating the state to isolate the call into its own state.
This commit is contained in:
parent
890d7a8d1f
commit
395ecd2b70
|
@ -41,20 +41,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
setKeyInterests();
|
||||
}
|
||||
catch (CancelledKeyException x)
|
||||
{
|
||||
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
|
||||
close();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.warn("Ignoring key update for " + this, x);
|
||||
close();
|
||||
}
|
||||
updateKey();
|
||||
}
|
||||
};
|
||||
private final AtomicReference<State> _interestState = new AtomicReference<>(State.SELECTING);
|
||||
|
@ -80,21 +67,21 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
@Override
|
||||
protected boolean needsFill()
|
||||
{
|
||||
changeInterests(SelectionKey.OP_READ, true);
|
||||
changeInterests(SelectionKey.OP_READ);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onIncompleteFlush()
|
||||
{
|
||||
changeInterests(SelectionKey.OP_WRITE, true);
|
||||
changeInterests(SelectionKey.OP_WRITE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSelected()
|
||||
{
|
||||
/**
|
||||
* This method may run concurrently with {@link #changeInterests(int, boolean)}.
|
||||
* This method may run concurrently with {@link #changeInterests(int)}.
|
||||
*/
|
||||
|
||||
assert _selector.isSelectorThread();
|
||||
|
@ -134,8 +121,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
return;
|
||||
}
|
||||
case CHANGING:
|
||||
case UPDATING:
|
||||
case CHANGING_UPDATING:
|
||||
{
|
||||
// Wait until the change is finished.
|
||||
// Wait until the modification is finished.
|
||||
Thread.yield();
|
||||
break;
|
||||
}
|
||||
|
@ -151,11 +140,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
public void updateKey()
|
||||
{
|
||||
/**
|
||||
* This method may run concurrently with {@link #changeInterests(int, boolean)}.
|
||||
* This method may run concurrently with
|
||||
* {@link #changeInterests(int)} and {@link #onSelected()}.
|
||||
*/
|
||||
|
||||
assert _selector.isSelectorThread();
|
||||
|
||||
while (true)
|
||||
{
|
||||
State current = _interestState.get();
|
||||
|
@ -165,10 +153,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
{
|
||||
case SELECTING:
|
||||
{
|
||||
// When a whole cycle triggered by changeInterests()
|
||||
// happens, we finish the job by updating the key.
|
||||
setKeyInterests();
|
||||
return;
|
||||
if (!_interestState.compareAndSet(current, State.UPDATING))
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
case PENDING:
|
||||
{
|
||||
|
@ -178,10 +165,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
}
|
||||
case UPDATING:
|
||||
{
|
||||
if (!_interestState.compareAndSet(current, State.SELECTING))
|
||||
throw new IllegalStateException("Invalid state: " + _interestState);
|
||||
// Set the key interest as expected.
|
||||
setKeyInterests();
|
||||
if (!_interestState.compareAndSet(current, State.SELECTING))
|
||||
throw new IllegalStateException("Invalid state: " + _interestState);
|
||||
return;
|
||||
}
|
||||
case CHANGING:
|
||||
|
@ -207,7 +194,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
}
|
||||
}
|
||||
|
||||
private void changeInterests(int operation, boolean add)
|
||||
private void changeInterests(int operation)
|
||||
{
|
||||
/**
|
||||
* This method may run concurrently with
|
||||
|
@ -242,11 +229,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
case CHANGING:
|
||||
{
|
||||
int oldInterestOps = _interestOps;
|
||||
int newInterestOps;
|
||||
if (add)
|
||||
newInterestOps = oldInterestOps | operation;
|
||||
else
|
||||
newInterestOps = oldInterestOps & ~operation;
|
||||
int newInterestOps = oldInterestOps | operation;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("changeInterests pending={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
|
||||
|
@ -288,12 +271,25 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
|
||||
private void setKeyInterests()
|
||||
{
|
||||
int oldInterestOps = _key.interestOps();
|
||||
int newInterestOps = _interestOps;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Key interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
|
||||
if (oldInterestOps != newInterestOps)
|
||||
_key.interestOps(newInterestOps);
|
||||
try
|
||||
{
|
||||
int oldInterestOps = _key.interestOps();
|
||||
int newInterestOps = _interestOps;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Key interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
|
||||
if (oldInterestOps != newInterestOps)
|
||||
_key.interestOps(newInterestOps);
|
||||
}
|
||||
catch (CancelledKeyException x)
|
||||
{
|
||||
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
|
||||
close();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.warn("Ignoring key update for " + this, x);
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -330,7 +326,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
// We do a best effort to print the right toString() and that's it.
|
||||
try
|
||||
{
|
||||
boolean valid = _key!=null && _key.isValid();
|
||||
boolean valid = _key != null && _key.isValid();
|
||||
int keyInterests = valid ? _key.interestOps() : -1;
|
||||
int keyReadiness = valid ? _key.readyOps() : -1;
|
||||
return String.format("%s{io=%d,kio=%d,kro=%d}",
|
||||
|
|
|
@ -716,25 +716,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
private void updateKey(SelectionKey key)
|
||||
{
|
||||
Object attachment = key.attachment();
|
||||
try
|
||||
{
|
||||
if (attachment instanceof SelectableEndPoint)
|
||||
{
|
||||
((SelectableEndPoint)attachment).updateKey();
|
||||
}
|
||||
}
|
||||
catch (CancelledKeyException x)
|
||||
{
|
||||
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
|
||||
if (attachment instanceof EndPoint)
|
||||
closeNoExceptions((EndPoint)attachment);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.warn("Could not process key for channel " + key.channel(), x);
|
||||
if (attachment instanceof EndPoint)
|
||||
closeNoExceptions((EndPoint)attachment);
|
||||
}
|
||||
if (attachment instanceof SelectableEndPoint)
|
||||
((SelectableEndPoint)attachment).updateKey();
|
||||
}
|
||||
|
||||
private void processConnect(SelectionKey key, Connect connect)
|
||||
|
|
Loading…
Reference in New Issue