Issue #4798 - Better handling of fatal Selector failures.

More updates after review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-04-30 10:55:29 +02:00
parent 8c75eeccce
commit 0a028b663b
3 changed files with 51 additions and 34 deletions

View File

@ -88,7 +88,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
} }
private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKey; private final ManagedSelector.SelectorUpdate _updateKeyAction = this::updateKeyAction;
private final Runnable _runFillable = new RunnableCloseable("runFillable") private final Runnable _runFillable = new RunnableCloseable("runFillable")
{ {
@ -315,13 +315,12 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
@Override @Override
public Runnable onSelected(SelectionKey key) public Runnable onSelected()
{ {
// This method runs from the selector thread, // This method runs from the selector thread,
// possibly concurrently with changeInterests(int). // possibly concurrently with changeInterests(int).
_key = key; int readyOps = _key.readyOps();
int readyOps = key.readyOps();
int oldInterestOps; int oldInterestOps;
int newInterestOps; int newInterestOps;
synchronized (this) synchronized (this)
@ -353,13 +352,13 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
return task; return task;
} }
private void updateKey(Selector selector) private void updateKeyAction(Selector selector)
{ {
_selector.runKeyAction(selector, _channel, _key, this::updateKey); updateKey();
} }
@Override @Override
public void updateKey(SelectionKey key) public void updateKey()
{ {
// This method runs from the selector thread, // This method runs from the selector thread,
// possibly concurrently with changeInterests(int). // possibly concurrently with changeInterests(int).
@ -376,7 +375,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
if (oldInterestOps != newInterestOps) if (oldInterestOps != newInterestOps)
{ {
_currentInterestOps = newInterestOps; _currentInterestOps = newInterestOps;
key.interestOps(newInterestOps); _key.interestOps(newInterestOps);
} }
} }
@ -396,6 +395,12 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
} }
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
private void changeInterests(int operation) private void changeInterests(int operation)
{ {
// This method runs from any thread, possibly // This method runs from any thread, possibly

View File

@ -42,7 +42,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -145,17 +144,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
super.doStop(); super.doStop();
} }
void runKeyAction(Selector selector, SelectableChannel channel, SelectionKey selectionKey, Consumer<SelectionKey> action)
{
SelectionKey key = selectionKey;
// Refresh the key if the selector has been recreated.
if (selector != key.selector())
key = channel.keyFor(selector);
// The key may be null if the channel is closed.
if (key != null)
action.accept(key);
}
protected int nioSelect(Selector selector, boolean now) throws IOException protected int nioSelect(Selector selector, boolean now) throws IOException
{ {
return now ? selector.selectNow() : selector.select(); return now ? selector.selectNow() : selector.select();
@ -206,7 +194,9 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
try try
{ {
Object attachment = oldKey.attachment(); Object attachment = oldKey.attachment();
channel.register(newSelector, interestOps, attachment); SelectionKey newKey = channel.register(newSelector, interestOps, attachment);
if (attachment instanceof Selectable)
((Selectable)attachment).replaceKey(newKey);
oldKey.cancel(); oldKey.cancel();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Transferred {} iOps={} att={}", channel, interestOps, attachment); LOG.debug("Transferred {} iOps={} att={}", channel, interestOps, attachment);
@ -361,7 +351,18 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment()); Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setConnection(connection); endPoint.setConnection(connection);
submit(selector -> runKeyAction(selector, channel, selectionKey, key -> key.attach(endPoint)), true); submit(selector ->
{
SelectionKey key = selectionKey;
if (key.selector() != selector)
{
key = channel.keyFor(selector);
if (key != null && endPoint instanceof Selectable)
((Selectable)endPoint).replaceKey(key);
}
if (key != null)
key.attach(endPoint);
}, true);
endPoint.onOpen(); endPoint.onOpen();
endPointOpened(endPoint); endPointOpened(endPoint);
_selectorManager.connectionOpened(connection); _selectorManager.connectionOpened(connection);
@ -469,16 +470,22 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
* detected by the {@link ManagedSelector} for this endpoint. * detected by the {@link ManagedSelector} for this endpoint.
* *
* @return a job that may block or null * @return a job that may block or null
* @param key the selected SelectionKey
*/ */
Runnable onSelected(SelectionKey key); Runnable onSelected();
/** /**
* Callback method invoked when all the keys selected by the * Callback method invoked when all the keys selected by the
* {@link ManagedSelector} for this endpoint have been processed. * {@link ManagedSelector} for this endpoint have been processed.
* @param key the SelectionKey to update
*/ */
void updateKey(SelectionKey key); void updateKey();
/**
* Callback method invoked when the SelectionKey is replaced
* because the channel has been moved to a new selector.
*
* @param newKey the new SelectionKey
*/
void replaceKey(SelectionKey newKey);
} }
private class SelectorProducer implements ExecutionStrategy.Producer private class SelectorProducer implements ExecutionStrategy.Producer
@ -623,7 +630,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (attachment instanceof Selectable) if (attachment instanceof Selectable)
{ {
// Try to produce a task // Try to produce a task
Runnable task = ((Selectable)attachment).onSelected(key); Runnable task = ((Selectable)attachment).onSelected();
if (task != null) if (task != null)
return task; return task;
} }
@ -667,7 +674,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
Object attachment = key.attachment(); Object attachment = key.attachment();
if (attachment instanceof Selectable) if (attachment instanceof Selectable)
((Selectable)attachment).updateKey(key); ((Selectable)attachment).updateKey();
} }
_keys.clear(); _keys.clear();
} }
@ -759,9 +766,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
@Override @Override
public Runnable onSelected(SelectionKey key) public Runnable onSelected()
{ {
_key = key;
SelectableChannel channel = null; SelectableChannel channel = null;
try try
{ {
@ -782,16 +788,22 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
@Override @Override
public void updateKey(SelectionKey key) public void updateKey()
{ {
} }
@Override
public void replaceKey(SelectionKey newKey)
{
_key = newKey;
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
// May be called from any thread. // May be called from any thread.
// Implements AbstractConnector.setAccepting(boolean). // Implements AbstractConnector.setAccepting(boolean).
submit(selector -> runKeyAction(selector, _channel, _key, SelectionKey::cancel)); submit(selector -> _key.cancel());
} }
} }

View File

@ -79,10 +79,10 @@ public class ExtendedServerTest extends HttpServerTestBase
} }
@Override @Override
public Runnable onSelected(SelectionKey key) public Runnable onSelected()
{ {
_lastSelected = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); _lastSelected = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
return super.onSelected(key); return super.onSelected();
} }
long getLastSelected() long getLastSelected()