Issue #4798 - Better handling of fatal Selector failures.
Updates after review. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
952a20f81c
commit
8c75eeccce
|
@ -320,6 +320,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
// This method runs from the selector thread,
|
||||
// possibly concurrently with changeInterests(int).
|
||||
|
||||
_key = key;
|
||||
int readyOps = key.readyOps();
|
||||
int oldInterestOps;
|
||||
int newInterestOps;
|
||||
|
@ -354,7 +355,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
|
||||
private void updateKey(Selector selector)
|
||||
{
|
||||
_selector.compute(selector, _channel, _key, this::updateKey);
|
||||
_selector.runKeyAction(selector, _channel, _key, this::updateKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -395,12 +396,6 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replaceKey(SelectionKey oldKey, SelectionKey newKey)
|
||||
{
|
||||
_key = newKey;
|
||||
}
|
||||
|
||||
private void changeInterests(int operation)
|
||||
{
|
||||
// This method runs from any thread, possibly
|
||||
|
|
|
@ -145,7 +145,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
super.doStop();
|
||||
}
|
||||
|
||||
void compute(Selector selector, SelectableChannel channel, SelectionKey selectionKey, Consumer<SelectionKey> action)
|
||||
void runKeyAction(Selector selector, SelectableChannel channel, SelectionKey selectionKey, Consumer<SelectionKey> action)
|
||||
{
|
||||
SelectionKey key = selectionKey;
|
||||
// Refresh the key if the selector has been recreated.
|
||||
|
@ -206,11 +206,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
try
|
||||
{
|
||||
Object attachment = oldKey.attachment();
|
||||
SelectionKey newKey = channel.register(newSelector, interestOps, attachment);
|
||||
|
||||
if (attachment instanceof Selectable)
|
||||
((Selectable)attachment).replaceKey(oldKey, newKey);
|
||||
|
||||
channel.register(newSelector, interestOps, attachment);
|
||||
oldKey.cancel();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Transferred {} iOps={} att={}", channel, interestOps, attachment);
|
||||
|
@ -365,7 +361,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
|
||||
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
|
||||
endPoint.setConnection(connection);
|
||||
submit(selector -> compute(selector, channel, selectionKey, key -> key.attach(endPoint)), true);
|
||||
submit(selector -> runKeyAction(selector, channel, selectionKey, key -> key.attach(endPoint)), true);
|
||||
endPoint.onOpen();
|
||||
endPointOpened(endPoint);
|
||||
_selectorManager.connectionOpened(connection);
|
||||
|
@ -483,15 +479,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
* @param key the SelectionKey to update
|
||||
*/
|
||||
void updateKey(SelectionKey key);
|
||||
|
||||
/**
|
||||
* Callback method invoked when a selectable is transferred
|
||||
* from one selector to a new selector.
|
||||
*
|
||||
* @param oldKey the old SelectionKey
|
||||
* @param newKey the new SelectionKey
|
||||
*/
|
||||
void replaceKey(SelectionKey oldKey, SelectionKey newKey);
|
||||
}
|
||||
|
||||
private class SelectorProducer implements ExecutionStrategy.Producer
|
||||
|
@ -774,6 +761,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
@Override
|
||||
public Runnable onSelected(SelectionKey key)
|
||||
{
|
||||
_key = key;
|
||||
SelectableChannel channel = null;
|
||||
try
|
||||
{
|
||||
|
@ -798,18 +786,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replaceKey(SelectionKey oldKey, SelectionKey newKey)
|
||||
{
|
||||
_key = newKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
// May be called from any thread.
|
||||
// Implements AbstractConnector.setAccepting(boolean).
|
||||
submit(selector -> compute(selector, _channel, _key, SelectionKey::cancel));
|
||||
submit(selector -> runKeyAction(selector, _channel, _key, SelectionKey::cancel));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue