Issue #2003 - Do not submit blocking tasks as managed selector actions.

CreateEndPoint and DestroyEndPoint are now submitted directly to
the Executor, rather than being submitted as selector actions.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2017-12-01 18:34:49 +01:00
parent b1c80ba231
commit 2067eac701
1 changed files with 22 additions and 11 deletions

View File

@ -36,6 +36,7 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -148,7 +149,20 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
selector.wakeup(); selector.wakeup();
} }
private Runnable processConnect(SelectionKey key, final Connect connect) private void execute(Runnable task)
{
try
{
_selectorManager.execute(task);
}
catch (RejectedExecutionException x)
{
if (task instanceof Closeable)
closeNoExceptions((Closeable)task);
}
}
private void processConnect(SelectionKey key, final Connect connect)
{ {
SelectableChannel channel = key.channel(); SelectableChannel channel = key.channel();
try try
@ -162,7 +176,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (connect.timeout.cancel()) if (connect.timeout.cancel())
{ {
key.interestOps(0); key.interestOps(0);
return new CreateEndPoint(channel, key) execute(new CreateEndPoint(channel, key)
{ {
@Override @Override
protected void failed(Throwable failure) protected void failed(Throwable failure)
@ -170,7 +184,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
super.failed(failure); super.failed(failure);
connect.failed(failure); connect.failed(failure);
} }
}; });
} }
else else
{ {
@ -185,7 +199,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
catch (Throwable x) catch (Throwable x)
{ {
connect.failed(x); connect.failed(x);
return null;
} }
} }
@ -217,7 +230,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
public void destroyEndPoint(final EndPoint endPoint) public void destroyEndPoint(final EndPoint endPoint)
{ {
submit(new DestroyEndPoint(endPoint)); execute(new DestroyEndPoint(endPoint));
} }
private int getActionSize() private int getActionSize()
@ -424,9 +437,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
else if (key.isConnectable()) else if (key.isConnectable())
{ {
Runnable task = processConnect(key, (Connect)attachment); processConnect(key, (Connect)attachment);
if (task != null)
return task;
} }
else else
{ {
@ -621,7 +632,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
try try
{ {
final SelectionKey key = channel.register(_selector, 0, attachment); final SelectionKey key = channel.register(_selector, 0, attachment);
submit(new CreateEndPoint(channel, key)); execute(new CreateEndPoint(channel, key));
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -631,7 +642,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private class CreateEndPoint implements Runnable, Invocable, Closeable private class CreateEndPoint implements Runnable, Closeable
{ {
private final SelectableChannel channel; private final SelectableChannel channel;
private final SelectionKey key; private final SelectionKey key;
@ -823,7 +834,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private class DestroyEndPoint implements Runnable, Invocable, Closeable private class DestroyEndPoint implements Runnable, Closeable
{ {
private final EndPoint endPoint; private final EndPoint endPoint;