Avoid StackOverflowErrors when submitting changes.
These were possible on a busy server, when many new connections are created, and each triggers read interest: one connection submits the read interest change, then runs the changes, finds that another connections is created, runs it, which schedule a read interest change, and so on. Now the code is simpler, and while we always offer to the queue, it may even be faster.
This commit is contained in:
parent
78dfd287e3
commit
ddca8bc327
|
@ -362,56 +362,41 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
*/
|
||||
public void submit(Runnable change)
|
||||
{
|
||||
if (isSelectorThread())
|
||||
{
|
||||
if (_state.get() == State.PROCESS)
|
||||
{
|
||||
// We are processing, so lets handle existing changes
|
||||
runChanges();
|
||||
// and then directly run the passed change without queueing it
|
||||
runChange(change);
|
||||
}
|
||||
else
|
||||
{
|
||||
// We must be iterating in CHANGES or MORE_CHANGES
|
||||
// state, so just append to the queue to preserve order.
|
||||
_changes.offer(change);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Otherwise we have to queue the change and possibly wakeup the selector
|
||||
_changes.offer(change);
|
||||
LOG.debug("Queued change {}", change);
|
||||
// This method may be called from the selector thread, and therefore
|
||||
// we could directly run the change without queueing, but this may
|
||||
// lead to stack overflows on a busy server, so we always offer the
|
||||
// change to the queue and process the state.
|
||||
|
||||
out: while (true)
|
||||
_changes.offer(change);
|
||||
LOG.debug("Queued change {}", change);
|
||||
|
||||
out: while (true)
|
||||
{
|
||||
switch (_state.get())
|
||||
{
|
||||
switch (_state.get())
|
||||
{
|
||||
case SELECT:
|
||||
// Avoid multiple wakeup() calls if we the CAS fails
|
||||
if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
|
||||
continue;
|
||||
wakeup();
|
||||
break out;
|
||||
case CHANGES:
|
||||
// Tell the selector thread that we have more changes.
|
||||
// If we fail to CAS, we possibly need to wakeup(), so loop.
|
||||
if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
|
||||
break out;
|
||||
case SELECT:
|
||||
// Avoid multiple wakeup() calls if we the CAS fails
|
||||
if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
|
||||
continue;
|
||||
case WAKEUP:
|
||||
// Do nothing, we have already a wakeup scheduled
|
||||
wakeup();
|
||||
break out;
|
||||
case CHANGES:
|
||||
// Tell the selector thread that we have more changes.
|
||||
// If we fail to CAS, we possibly need to wakeup(), so loop.
|
||||
if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
|
||||
break out;
|
||||
case MORE_CHANGES:
|
||||
// Do nothing, we already notified the selector thread of more changes
|
||||
break out;
|
||||
case PROCESS:
|
||||
// Do nothing, the changes will be run after the processing
|
||||
break out;
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
continue;
|
||||
case WAKEUP:
|
||||
// Do nothing, we have already a wakeup scheduled
|
||||
break out;
|
||||
case MORE_CHANGES:
|
||||
// Do nothing, we already notified the selector thread of more changes
|
||||
break out;
|
||||
case PROCESS:
|
||||
// Do nothing, the changes will be run after the processing
|
||||
break out;
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue