Submit selector interest changes
After seeing a stack trace with many threads blocked on setInterest, have reverted to always submitting changes
This commit is contained in:
parent
70894962a1
commit
e8a843dc6a
|
@ -38,10 +38,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
|
||||
private enum State
|
||||
{
|
||||
SELECTING, SELECTED, LOCKED
|
||||
UPDATED, UPDATE_PENDING, LOCKED
|
||||
}
|
||||
|
||||
private final AtomicReference<State> _interestState = new AtomicReference<>(State.SELECTING);
|
||||
private final AtomicReference<State> _interestState = new AtomicReference<>(State.UPDATED);
|
||||
/**
|
||||
* true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
|
||||
*/
|
||||
|
@ -52,6 +52,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
* The desired value for {@link SelectionKey#interestOps()}
|
||||
*/
|
||||
private int _interestOps;
|
||||
|
||||
private final Runnable _runUpdateKey = new Runnable() { public void run() { updateKey(); } };
|
||||
|
||||
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
|
||||
{
|
||||
|
@ -90,8 +92,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
LOG.debug("Processing, state {} for {}", current, this);
|
||||
switch (current)
|
||||
{
|
||||
case SELECTED:
|
||||
case SELECTING:
|
||||
case UPDATE_PENDING:
|
||||
case UPDATED:
|
||||
{
|
||||
if (!_interestState.compareAndSet(current, State.LOCKED))
|
||||
continue;
|
||||
|
@ -110,7 +112,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
}
|
||||
finally
|
||||
{
|
||||
_interestState.set(State.SELECTED);
|
||||
// We have been called by onSelected, so we know the
|
||||
// selector will call updateKey before selecting again.
|
||||
_interestState.set(State.UPDATE_PENDING);
|
||||
}
|
||||
|
||||
if ((readyOps & SelectionKey.OP_READ) != 0)
|
||||
|
@ -149,8 +153,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
LOG.debug("Updating key, state {} for {}", current, this);
|
||||
switch (current)
|
||||
{
|
||||
case SELECTED:
|
||||
case SELECTING:
|
||||
case UPDATE_PENDING:
|
||||
case UPDATED:
|
||||
{
|
||||
if (!_interestState.compareAndSet(current, State.LOCKED))
|
||||
continue;
|
||||
|
@ -163,7 +167,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
}
|
||||
finally
|
||||
{
|
||||
_interestState.set(State.SELECTING);
|
||||
// We have just updated the key, so we are now updated!
|
||||
// and no call to unpdateKey is pending
|
||||
_interestState.set(State.UPDATED);
|
||||
}
|
||||
}
|
||||
case LOCKED:
|
||||
|
@ -194,34 +200,32 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
LOG.debug("Changing interests in state {} for {}", current, this);
|
||||
switch (current)
|
||||
{
|
||||
case SELECTED:
|
||||
case SELECTING:
|
||||
case UPDATE_PENDING:
|
||||
case UPDATED:
|
||||
{
|
||||
if (!_interestState.compareAndSet(current, State.LOCKED))
|
||||
continue;
|
||||
|
||||
boolean selecting = current == State.SELECTING;
|
||||
try
|
||||
{
|
||||
int oldInterestOps = _interestOps;
|
||||
int newInterestOps = oldInterestOps | operation;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("changeInterests selecting={} {}->{} for {}", selecting, oldInterestOps, newInterestOps, this);
|
||||
LOG.debug("changeInterests s={} {}->{} for {}", current, oldInterestOps, newInterestOps, this);
|
||||
|
||||
if (newInterestOps != oldInterestOps)
|
||||
_interestOps = newInterestOps;
|
||||
|
||||
if (selecting)
|
||||
setKeyInterests();
|
||||
if (current==State.UPDATED)
|
||||
_selector.submit(_runUpdateKey);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_interestState.set(current);
|
||||
// If we were pending a call to updateKey, then we still are.
|
||||
// If we were not, then we have submitted a callback to runUpdateKey, so we now are pending.
|
||||
_interestState.set(State.UPDATE_PENDING);
|
||||
}
|
||||
|
||||
if (selecting)
|
||||
_selector.wakeup();
|
||||
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -492,12 +492,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
return false;
|
||||
}
|
||||
|
||||
public void wakeup()
|
||||
{
|
||||
if (_state.compareAndSet(State.SELECT, State.WAKEUP))
|
||||
_selector.wakeup();
|
||||
}
|
||||
|
||||
private void runChanges()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue