Fixed race condition in onSelected().

WebSocket and multiplexed protocols are always read interested.
It may happen that while the application is writing, the write
blocks, resulting in a call to changeInterests().
At the same time, the selector may detect data to read and call
onSelected(), so there is a possibility that onSelected() runs
concurrently with changeInterests().

The fix adds an additional state (PROCESSING) that isolates the
changes that onSelected() makes to _interestOps, spin-waiting if
changeInterests() is running concurrently.
Likewise, changeInterests() spin-waits until onSelected() is running
concurrently.
This commit is contained in:
Simone Bordet 2014-10-09 22:46:04 +02:00
parent 76278d3563
commit 96cf942b70
1 changed files with 51 additions and 20 deletions

View File

@ -94,28 +94,57 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
public void onSelected()
{
/**
* This method never runs concurrently with other
* methods that update _interestState.
* This method may run concurrently with {@link #changeInterests(int, boolean)}.
*/
assert _selector.isSelectorThread();
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
int readyOps = _key.readyOps();
int oldInterestOps = _interestOps;
int newInterestOps = oldInterestOps & ~readyOps;
_interestOps = newInterestOps;
while (true)
{
State current = _interestState.get();
if (LOG.isDebugEnabled())
LOG.debug("Processing, state {} for {}", current, this);
switch (current)
{
case SELECTING:
{
if (!_interestState.compareAndSet(current, State.PROCESSING))
continue;
break;
}
case PROCESSING:
{
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
int readyOps = _key.readyOps();
int oldInterestOps = _interestOps;
int newInterestOps = oldInterestOps & ~readyOps;
_interestOps = newInterestOps;
if (!_interestState.compareAndSet(State.SELECTING, State.PENDING))
throw new IllegalStateException("Invalid state: " + _interestState);
if (!_interestState.compareAndSet(current, State.PENDING))
throw new IllegalStateException("Invalid state: " + _interestState);
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
if ((readyOps & SelectionKey.OP_READ) != 0)
getFillInterest().fillable();
if ((readyOps & SelectionKey.OP_WRITE) != 0)
getWriteFlusher().completeWrite();
if ((readyOps & SelectionKey.OP_READ) != 0)
getFillInterest().fillable();
if ((readyOps & SelectionKey.OP_WRITE) != 0)
getWriteFlusher().completeWrite();
return;
}
case CHANGING:
{
// Wait until the change is finished.
Thread.yield();
break;
}
default:
{
throw new IllegalStateException("Invalid state: " + current);
}
}
}
}
@Override
@ -150,7 +179,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
case UPDATING:
{
if (!_interestState.compareAndSet(current, State.SELECTING))
throw new IllegalStateException();
throw new IllegalStateException("Invalid state: " + _interestState);
// Set the key interest as expected.
setKeyInterests();
return;
@ -172,7 +201,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
}
default:
{
throw new IllegalStateException();
throw new IllegalStateException("Invalid state: " + current);
}
}
}
@ -181,7 +210,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private void changeInterests(int operation, boolean add)
{
/**
* This method may run concurrently with {@link #updateKey()}.
* This method may run concurrently with
* {@link #updateKey()} and {@link #onSelected()}.
*/
boolean pending = false;
@ -200,6 +230,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
pending = current == State.PENDING;
break;
}
case PROCESSING:
case UPDATING:
{
// We lost the race to update _interestOps, but we
@ -236,7 +267,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
case CHANGING_UPDATING:
{
if (!_interestState.compareAndSet(current, State.SELECTING))
throw new IllegalStateException("Invalid state " + current);
throw new IllegalStateException("Invalid state " + _interestState);
// If we could CAS, we let the selector thread
// update the key since it will be less expensive.
return;
@ -316,6 +347,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private enum State
{
SELECTING, PENDING, UPDATING, CHANGING, CHANGING_UPDATING
SELECTING, PROCESSING, PENDING, UPDATING, CHANGING, CHANGING_UPDATING
}
}