443713 - Reduce number of SelectionKey.setInterestOps() calls.

Introduced a state machine to handle the various scenarios (ST = selector thread, Tx = pooled thread):

ST: call to SCEP.onSelected() moves from SELECTING -> PENDING.
ST: call to SCEP.updateKey() moves from PENDING -> UPDATING -> SELECTING
T1: call to SCEP.changeInterests() moves (SELECTING | PENDING) -> CHANGING -> SELECTING

The race between ST and T1 to move from PENDING to either UPDATING or CHANGING will be won
by one thread only, which will then perform the call to SelectionKey.setInterestOps().
Preferably, this will be done by ST during an updateKey() call. If updateKey() has already
been invoked, then changeInterests() will perform the call to SelectionKey.setInterestOps().

However, if T1 loses, it still has to perform the key update, so it will spin until ST
moves back to SELECTING.
This commit is contained in:
Simone Bordet 2014-09-10 16:34:25 +02:00
parent 1ee11138a7
commit fd1c9dd8d2
2 changed files with 204 additions and 59 deletions

View File

@ -22,7 +22,7 @@ import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.util.log.Log;
@ -43,27 +43,21 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
{
try
{
if (getChannel().isOpen())
{
int oldInterestOps = _key.interestOps();
int newInterestOps = _interestOps.get();
if (newInterestOps != oldInterestOps)
setKeyInterests(oldInterestOps, newInterestOps);
}
setKeyInterests();
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
close();
}
catch (Exception x)
catch (Throwable x)
{
LOG.warn("Ignoring key update for " + this, x);
close();
}
}
};
private final AtomicReference<State> _interestState = new AtomicReference<>(State.SELECTING);
/**
* true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
*/
@ -73,11 +67,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
/**
* The desired value for {@link SelectionKey#interestOps()}
*/
private final AtomicInteger _interestOps = new AtomicInteger();
private int _interestOps;
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{
super(scheduler,channel);
super(scheduler, channel);
_selector = selector;
_key = key;
setIdleTimeout(idleTimeout);
@ -86,78 +80,183 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
@Override
protected boolean needsFill()
{
updateLocalInterests(SelectionKey.OP_READ, true);
return false;
return !changeInterests(SelectionKey.OP_READ, true);
}
@Override
protected void onIncompleteFlush()
{
updateLocalInterests(SelectionKey.OP_WRITE, true);
changeInterests(SelectionKey.OP_WRITE, true);
}
@Override
public void onSelected()
{
/**
* This method never runs concurrently with other
* methods that update _interestState.
*/
assert _selector.isSelectorThread();
int oldInterestOps = _key.interestOps();
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
int readyOps = _key.readyOps();
int oldInterestOps = _interestOps;
int newInterestOps = oldInterestOps & ~readyOps;
setKeyInterests(oldInterestOps, newInterestOps);
updateLocalInterests(readyOps, false);
if (_key.isReadable())
_interestOps = newInterestOps;
if (!_interestState.compareAndSet(State.SELECTING, State.PENDING))
throw new IllegalStateException("Invalid state: " + _interestState);
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
if ((readyOps & SelectionKey.OP_READ) != 0)
getFillInterest().fillable();
if (_key.isWritable())
if ((readyOps & SelectionKey.OP_WRITE) != 0)
getWriteFlusher().completeWrite();
}
private void updateLocalInterests(int operation, boolean add)
@Override
public void updateKey()
{
/**
* This method may run concurrently with {@link #changeInterests(int, boolean)}.
*/
assert _selector.isSelectorThread();
while (true)
{
int oldInterestOps = _interestOps.get();
int newInterestOps;
if (add)
newInterestOps = oldInterestOps | operation;
else
newInterestOps = oldInterestOps & ~operation;
if (isInputShutdown())
newInterestOps &= ~SelectionKey.OP_READ;
if (isOutputShutdown())
newInterestOps &= ~SelectionKey.OP_WRITE;
if (newInterestOps != oldInterestOps)
State current = _interestState.get();
switch (current)
{
if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
case SELECTING:
{
if (LOG.isDebugEnabled())
LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
_selector.updateKey(_updateTask);
// When a whole cycle triggered by changeInterests()
// happens, we finish the job by updating the key.
setKeyInterests();
return;
}
else
case PENDING:
{
if (LOG.isDebugEnabled())
LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
continue;
if (!_interestState.compareAndSet(current, State.UPDATING))
continue;
break;
}
case UPDATING:
{
// Set the key interest as expected.
setKeyInterests();
if (!_interestState.compareAndSet(current, State.SELECTING))
throw new IllegalStateException();
return;
}
case CHANGING:
{
// We lost the race to update _interestOps,
// let changeInterests() perform the update.
return;
}
default:
{
throw new IllegalStateException();
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
}
break;
}
}
private void setKeyInterests(int oldInterestOps, int newInterestOps)
private boolean changeInterests(int operation, boolean add)
{
/**
* This method may run concurrently with {@link #updateKey()}.
*/
boolean pending = false;
boolean changed = true;
while (true)
{
State current = _interestState.get();
switch (current)
{
case SELECTING:
case PENDING:
{
if (!_interestState.compareAndSet(current, State.CHANGING))
continue;
pending = current == State.PENDING;
break;
}
case UPDATING:
{
// We lost the race to update _interestOps, but we
// must update it nonetheless, so yield and spin,
// waiting for the state to be SELECTING again.
Thread.yield();
break;
}
case CHANGING:
{
int oldInterestOps = _interestOps;
int newInterestOps;
if (add)
newInterestOps = oldInterestOps | operation;
else
newInterestOps = oldInterestOps & ~operation;
if (isInputShutdown())
{
newInterestOps &= ~SelectionKey.OP_READ;
if (add && (operation & SelectionKey.OP_READ) != 0)
changed = false;
}
if (isOutputShutdown())
{
newInterestOps &= ~SelectionKey.OP_WRITE;
if (add && (operation & SelectionKey.OP_WRITE) != 0)
changed = false;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests pending={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (newInterestOps != oldInterestOps)
_interestOps = newInterestOps;
if (!_interestState.compareAndSet(current, State.SELECTING))
throw new IllegalStateException("Invalid state: " + current);
// We only update the key if updateKey() does not do it for us,
// because doing it from the selector thread is less expensive.
// This must be done after CASing the state above, otherwise the
// selector may select and call onSelected() concurrently.
submitKeyUpdate(!pending);
return changed;
}
default:
{
throw new IllegalStateException();
}
}
}
}
protected void submitKeyUpdate(boolean submit)
{
if (submit)
_selector.updateKey(_updateTask);
}
private void setKeyInterests()
{
int oldInterestOps = _key.interestOps();
int newInterestOps = _interestOps;
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
_key.interestOps(newInterestOps);
LOG.debug("Key interests update {} -> {}", oldInterestOps, newInterestOps);
if (oldInterestOps != newInterestOps)
_key.interestOps(newInterestOps);
}
@Override
@ -199,13 +298,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
int keyReadiness = valid ? _key.readyOps() : -1;
return String.format("%s{io=%d,kio=%d,kro=%d}",
super.toString(),
_interestOps.get(),
_interestOps,
keyInterests,
keyReadiness);
}
catch (CancelledKeyException x)
{
return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps);
}
}
private enum State
{
SELECTING, PENDING, UPDATING, CHANGING
}
}

View File

@ -650,6 +650,17 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
((EndPoint)attachment).close();
}
}
// Allow any dispatched tasks to run.
Thread.yield();
// Update the keys.
for (SelectionKey key : selectedKeys)
{
if (key.isValid())
updateKey(key);
}
selectedKeys.clear();
}
catch (Throwable x)
@ -697,6 +708,30 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableEndPoint)
{
((SelectableEndPoint)attachment).updateKey();
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
private void processConnect(SelectionKey key, Connect connect)
{
SocketChannel channel = (SocketChannel)key.channel();
@ -1075,15 +1110,21 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
/**
* A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
* non-blocking events by the {@link ManagedSelector}.
* A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
* notified of non-blocking events by the {@link ManagedSelector}.
*/
public interface SelectableEndPoint extends EndPoint
{
/**
* <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
* for this endpoint.</p>
* Callback method invoked when a read or write events has been
* detected by the {@link ManagedSelector} for this endpoint.
*/
void onSelected();
/**
* Callback method invoked when all the keys selected by the
* {@link ManagedSelector} for this endpoint have been processed.
*/
void updateKey();
}
}