Rewrite of ManagedSelector to handle connection creation as an ExecutionStrategy task.

Now the creation of a connection, and the Connection.onOpen() call
happen as a Runnable that is run by the ExecutionStrategy.
This allows onOpen() to block or otherwise perform tasks that are not
suitable to be run by a selector thread, since the ExecutionStrategy
will guarantee that another thread will take over the selector duties.
This commit is contained in:
Simone Bordet 2015-02-18 23:00:12 +01:00
parent 59ae845d30
commit adaa520cc9
3 changed files with 311 additions and 134 deletions

View File

@ -27,24 +27,21 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.SelectorManager.SelectableEndPoint;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
@ -52,24 +49,23 @@ import org.eclipse.jetty.util.thread.Scheduler;
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.</p>
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable, ExecutionStrategy.Producer
public class ManagedSelector extends AbstractLifeCycle implements Runnable//, Dumpable TODO
{
protected static final Logger LOG = Log.getLogger(ManagedSelector.class);
private final ExecutionStrategy _strategy;
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
private final SpinLock _lock = new SpinLock();
private final Queue<Runnable> _actions = new ConcurrentArrayQueue<>();
private final SelectorManager _selectorManager;
private final AtomicReference<State> _state = new AtomicReference<>(State.PROCESSING);
private final int _id;
private List<Runnable> _runChanges = new ArrayList<>();
private List<Runnable> _addChanges = new ArrayList<>();
private Iterator<SelectionKey> _selections = Collections.emptyIterator();
private Set<SelectionKey> _selectedKeys = Collections.emptySet();
private final ExecutionStrategy _strategy;
private State _state = State.PROCESSING;
private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id)
{
_selectorManager = selectorManager;
_strategy = ExecutionStrategy.Factory.instanceFor(this, selectorManager.getExecutor());
_id = id;
_strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000);
}
@ -78,7 +74,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
super.doStart();
_selector = newSelector();
_state.set(State.PROCESSING);
}
protected Selector newSelector() throws IOException
@ -89,7 +84,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
public int size()
{
Selector s = _selector;
if (s==null)
if (s == null)
return 0;
return s.keys().size();
}
@ -106,81 +101,247 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
LOG.debug("Stopped {}", this);
}
/**
* <p>Submits a change to be executed in the selector thread.</p>
* <p>Changes may be submitted from any thread, and the selector thread woken up
* (if necessary) to execute the change.</p>
*
* @param change the change to submit
*/
public void submit(Runnable change)
{
// This method may be called from the selector thread, and therefore
// we could directly run the change without queueing, but this may
// lead to stack overflows on a busy server, so we always offer the
// change to the queue and process the state.
if (LOG.isDebugEnabled())
LOG.debug("Queued change {}", change);
out:
while (true)
try (SpinLock.Lock lock = _lock.lock())
{
State state = _state.get();
switch (state)
_actions.offer(change);
if (_state == State.SELECTING)
{
case PROCESSING:
// If we are processing
if (!_state.compareAndSet(State.PROCESSING, State.LOCKED))
continue;
// we can just lock and add the change
_addChanges.add(change);
_state.set(State.PROCESSING);
break out;
case SELECTING:
// If we are processing
if (!_state.compareAndSet(State.SELECTING, State.LOCKED))
continue;
// we must lock, add the change and wakeup the selector
_addChanges.add(change);
_selector.wakeup();
// we move to processing state now, because the selector will
// not block and this avoids extra calls to wakeup()
_state.set(State.PROCESSING);
break out;
case LOCKED:
Thread.yield();
continue;
default:
throw new IllegalStateException();
_selector.wakeup();
// Move to PROCESSING now, so other submit()
// calls will avoid the extra select wakeup.
_state = State.PROCESSING;
}
}
}
protected void runChange(Runnable change)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Running change {}", change);
change.run();
}
catch (Throwable x)
{
LOG.debug("Could not run change " + change, x);
}
}
@Override
public void run()
{
_strategy.execute();
}
/**
* A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
* notified of non-blocking events by the {@link ManagedSelector}.
*/
public interface SelectableEndPoint extends EndPoint
{
/**
* Callback method invoked when a read or write events has been
* detected by the {@link ManagedSelector} for this endpoint.
*
* @return a job that may block or null
*/
Runnable onSelected();
/**
* Callback method invoked when all the keys selected by the
* {@link ManagedSelector} for this endpoint have been processed.
*/
void updateKey();
}
private class SelectorProducer implements ExecutionStrategy.Producer
{
private Set<SelectionKey> _keys = Collections.emptySet();
private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
@Override
public Runnable produce()
{
boolean looping = false;
while (true)
{
if (looping)
{
Runnable task = runActions();
if (task != null)
return task;
if (!select())
return null;
}
Runnable task = processSelected();
if (task != null)
return task;
update();
looping = true;
}
}
public Runnable produce2()
{
while (true)
{
Runnable task = processSelected();
if (task != null)
return task;
Runnable action = runActions();
if (action != null)
return action;
update();
if (!select())
return null;
}
}
private Runnable runActions()
{
while (true)
{
Runnable action;
try (SpinLock.Lock lock = _lock.lock())
{
action = _actions.poll();
if (action == null)
{
_state = State.SELECTING;
return null;
}
}
if (action instanceof Product)
return action;
// Running the change may queue another action.
runChange(action);
}
}
private void runChange(Runnable change)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Running change {}", change);
change.run();
}
catch (Throwable x)
{
LOG.debug("Could not run change " + change, x);
}
}
private boolean select()
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop waiting on select");
int selected = _selector.select();
if (LOG.isDebugEnabled())
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
try (SpinLock.Lock lock = _lock.lock())
{
_state = State.PROCESSING;
}
_keys = _selector.selectedKeys();
_cursor = _keys.iterator();
return true;
}
catch (Throwable x)
{
closeNoExceptions(_selector);
if (isRunning())
LOG.warn(x);
else
LOG.debug(x);
return false;
}
}
private Runnable processSelected()
{
while (_cursor.hasNext())
{
SelectionKey key = _cursor.next();
if (key.isValid())
{
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableEndPoint)
{
// Try to produce a task
SelectableEndPoint selectable = (SelectableEndPoint)attachment;
Runnable task = selectable.onSelected();
if (task != null)
return task;
}
else if (key.isConnectable())
{
Runnable task = processConnect(key, (Connect)attachment);
if (task != null)
return task;
}
else if (key.isAcceptable())
{
processAccept(key);
}
else
{
throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
return null;
}
private void update()
{
for (SelectionKey key : _keys)
updateKey(key);
_keys.clear();
}
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
if (attachment instanceof SelectableEndPoint)
((SelectableEndPoint)attachment).updateKey();
}
}
private interface Product extends Runnable
{
}
/*
@Override
public Runnable produce()
{
@ -362,8 +523,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (attachment instanceof SelectableEndPoint)
((SelectableEndPoint)attachment).updateKey();
}
private void processConnect(SelectionKey key, Connect connect)
*/
private Runnable processConnect(SelectionKey key, final Connect connect)
{
SocketChannel channel = (SocketChannel)key.channel();
try
@ -374,8 +535,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
connect.timeout.cancel();
key.interestOps(0);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
return new CreateEndPoint(channel, key)
{
@Override
protected void failed(Throwable failure)
{
super.failed(failure);
connect.failed(failure);
}
};
}
else
{
@ -385,6 +553,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
catch (Throwable x)
{
connect.failed(x);
return null;
}
}
@ -433,6 +602,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
public void destroyEndPoint(EndPoint endPoint)
{
// TODO: perhaps this code should be wrapped and submitted as a task.
if (LOG.isDebugEnabled())
LOG.debug("Destroyed {}", endPoint);
Connection connection = endPoint.getConnection();
@ -440,7 +610,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
_selectorManager.connectionClosed(connection);
_selectorManager.endPointClosed(endPoint);
}
/*
@Override
public String dump()
{
@ -479,7 +649,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
@Override
public String toString()
{
@ -519,7 +688,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
}
*/
class Acceptor implements Runnable
{
private final ServerSocketChannel _channel;
@ -562,9 +731,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
try
{
SelectionKey key = channel.register(_selector, 0, attachment);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
final SelectionKey key = channel.register(_selector, 0, attachment);
submit(new CreateEndPoint(channel, key));
}
catch (Throwable x)
{
@ -574,6 +742,38 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private class CreateEndPoint implements Product
{
private final SocketChannel channel;
private final SelectionKey key;
public CreateEndPoint(SocketChannel channel, SelectionKey key)
{
this.channel = channel;
this.key = key;
}
@Override
public void run()
{
try
{
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
}
catch (Throwable x)
{
failed(x);
}
}
protected void failed(Throwable failure)
{
closeNoExceptions(channel);
LOG.debug(failure);
}
}
class Connect implements Runnable
{
private final AtomicBoolean failed = new AtomicBoolean();
@ -634,6 +834,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
// TODO: convert this to produce tasks that are run by the ExecutionStrategy.
private class Stop implements Runnable
{
private final CountDownLatch latch = new CountDownLatch(1);
@ -717,7 +918,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private enum State
{
PROCESSING, SELECTING, LOCKED
PROCESSING, SELECTING
}
}

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
/**
* An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSelector.SelectableEndPoint
{
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
@ -115,17 +115,23 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
// selector will call updateKey before selecting again.
_interestState.set(State.UPDATE_PENDING);
}
if ((readyOps & SelectionKey.OP_READ) != 0)
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (readable)
{
if ((readyOps & SelectionKey.OP_WRITE) != 0)
if (writable)
return _runFillableCompleteWrite;
return _runFillable;
}
if ((readyOps & SelectionKey.OP_WRITE) != 0)
else if (writable)
{
return _runCompleteWrite;
return null;
}
else
{
return null;
}
}
case LOCKED:
{

View File

@ -304,27 +304,19 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param connection the connection just opened
*/
public void connectionOpened(final Connection connection)
public void connectionOpened(Connection connection)
{
// TODO remove this execution
getExecutor().execute(new Runnable()
try
{
@Override
public void run()
{
try
{
connection.onOpen();
}
catch (Throwable x)
{
if (isRunning())
LOG.warn("Exception while notifying connection " + connection, x);
else
LOG.debug("Exception while notifying connection " + connection, x);
}
}
});
connection.onOpen();
}
catch (Throwable x)
{
if (isRunning())
LOG.warn("Exception while notifying connection " + connection, x);
else
LOG.debug("Exception while notifying connection " + connection, x);
}
}
/**
@ -400,25 +392,4 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
}
/**
* A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
* notified of non-blocking events by the {@link ManagedSelector}.
*/
public interface SelectableEndPoint extends EndPoint
{
/**
* Callback method invoked when a read or write events has been
* detected by the {@link ManagedSelector} for this endpoint.
*
* @return a job that may block or null
*/
Runnable onSelected();
/**
* Callback method invoked when all the keys selected by the
* {@link ManagedSelector} for this endpoint have been processed.
*/
void updateKey();
}
}