Code cleanup.

Clarified with comments how the 2 execution strategies work in ManagedSelector.

Fixed computation of the minimum number of threads in Server.
This commit is contained in:
Simone Bordet 2016-11-03 11:44:36 +01:00
parent c6436c34ad
commit ca3af68809
2 changed files with 113 additions and 104 deletions

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -69,86 +70,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
private final ExecutionStrategy _lowPriorityStrategy; private final ExecutionStrategy _lowPriorityStrategy;
private Selector _selector; private Selector _selector;
private final Runnable _runStrategy = new Runnable()
{
@Override
public void run()
{
_strategy.produce();
}
};
private final Runnable _runLowPriorityStrategy = new Runnable()
{
@Override
public void run()
{
Thread current = Thread.currentThread();
String name = current.getName();
int priority = current.getPriority();
try
{
while (isRunning())
{
try
{
current.setPriority(Thread.MIN_PRIORITY);
current.setName(name+"-lowPrioSelector");
_lowPriorityStrategy.produce();
}
catch (Throwable th)
{
LOG.warn(th);
}
}
}
finally
{
current.setPriority(priority);
current.setName(name);
}
}
};
public ManagedSelector(SelectorManager selectorManager, int id) public ManagedSelector(SelectorManager selectorManager, int id)
{ {
_selectorManager = selectorManager; _selectorManager = selectorManager;
_id = id; _id = id;
SelectorProducer producer = new SelectorProducer(); SelectorProducer producer = new SelectorProducer();
_strategy = new ExecuteProduceConsume(producer, selectorManager.getExecutor(), Invocable.InvocationType.BLOCKING); Executor executor = selectorManager.getExecutor();
_lowPriorityStrategy = new ProduceExecuteConsume(producer, selectorManager.getExecutor(), Invocable.InvocationType.BLOCKING) _strategy = new ExecuteProduceConsume(producer, executor, Invocable.InvocationType.BLOCKING);
{ _lowPriorityStrategy = new LowPriorityProduceExecuteConsume(producer, executor);
@Override
protected boolean execute(Runnable task)
{
try
{
Invocable.InvocationType invocation=Invocable.getInvocationType(task);
if (LOG.isDebugEnabled())
LOG.debug("Low Prio Selector execute {} {}",invocation,task);
switch (Invocable.getInvocationType(task))
{
case NON_BLOCKING:
task.run();
return true;
case EITHER:
Invocable.invokeNonBlocking(task);
return true;
default:
}
return super.execute(task);
}
finally
{
// Allow opportunity for main strategy to take over
Thread.yield();
}
}
};
setStopTimeout(5000); setStopTimeout(5000);
} }
@ -156,9 +85,38 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
super.doStart(); super.doStart();
_selector = _selectorManager.newSelector(); _selector = _selectorManager.newSelector();
_selectorManager.execute(_runStrategy);
_selectorManager.execute(_runLowPriorityStrategy); // The producer used by the strategies will never
// be idle (either produces a task or blocks).
// The normal strategy obtains the produced task, schedules
// a new thread to produce more, runs the task and then exits.
_selectorManager.execute(_strategy::produce);
// The low priority strategy knows the producer will never
// be idle, that tasks are scheduled to run in different
// threads, therefore lowPriorityProduce() never exits.
_selectorManager.execute(this::lowPriorityProduce);
}
private void lowPriorityProduce()
{
Thread current = Thread.currentThread();
String name = current.getName();
int priority = current.getPriority();
current.setPriority(Thread.MIN_PRIORITY);
current.setName(name+"-lowPrioritySelector");
try
{
_lowPriorityStrategy.produce();
}
finally
{
current.setPriority(priority);
current.setName(name);
}
} }
public int size() public int size()
@ -227,13 +185,59 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
void updateKey(); void updateKey();
} }
private static class LowPriorityProduceExecuteConsume extends ProduceExecuteConsume
{
private LowPriorityProduceExecuteConsume(SelectorProducer producer, Executor executor)
{
super(producer, executor, InvocationType.BLOCKING);
}
@Override
protected boolean execute(Runnable task)
{
try
{
InvocationType invocation=Invocable.getInvocationType(task);
if (LOG.isDebugEnabled())
LOG.debug("Low Priority Selector executing {} {}",invocation,task);
switch (invocation)
{
case NON_BLOCKING:
task.run();
return true;
case EITHER:
Invocable.invokeNonBlocking(task);
return true;
default:
return super.execute(task);
}
}
finally
{
// Allow opportunity for main strategy to take over.
Thread.yield();
}
}
}
private class SelectorProducer implements ExecutionStrategy.Producer private class SelectorProducer implements ExecutionStrategy.Producer
{ {
private Set<SelectionKey> _keys = Collections.emptySet(); private Set<SelectionKey> _keys = Collections.emptySet();
private Iterator<SelectionKey> _cursor = Collections.emptyIterator(); private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
@Override @Override
public synchronized Runnable produce() public Runnable produce()
{
// This method is called from both the
// normal and low priority strategies.
// Only one can produce at a time, so it's synchronized
// to enforce that only one strategy actually produces.
// When idle in select(), this method blocks;
// the other strategy's thread will be blocked
// waiting for this lock to be released.
synchronized (this)
{ {
while (true) while (true)
{ {
@ -251,6 +255,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
return null; return null;
} }
} }
}
private Runnable nextAction() private Runnable nextAction()
{ {
@ -492,7 +497,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
public void destroyEndPoint(final EndPoint endPoint) public void destroyEndPoint(final EndPoint endPoint)
{ {
final Connection connection = endPoint.getConnection(); final Connection connection = endPoint.getConnection();
submit((Runnable)() -> submit(() ->
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Destroyed {}", endPoint); LOG.debug("Destroyed {}", endPoint);

View File

@ -378,8 +378,7 @@ public class Server extends HandlerWrapper implements Attributes
HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION);
// Check that the thread pool size is enough.
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class); SizedThreadPool pool = getBean(SizedThreadPool.class);
int max=pool==null?-1:pool.getMaxThreads(); int max=pool==null?-1:pool.getMaxThreads();
int selectors=0; int selectors=0;
@ -387,23 +386,28 @@ public class Server extends HandlerWrapper implements Attributes
for (Connector connector : _connectors) for (Connector connector : _connectors)
{ {
if (!(connector instanceof AbstractConnector)) if (connector instanceof AbstractConnector)
continue; {
AbstractConnector abstractConnector = (AbstractConnector)connector; AbstractConnector abstractConnector = (AbstractConnector)connector;
Executor connectorExecutor = connector.getExecutor(); Executor connectorExecutor = connector.getExecutor();
if (connectorExecutor != pool) if (connectorExecutor != pool)
// Do not count the selectors and acceptors from this connector at server level, because connector uses dedicated executor. {
// Do not count the selectors and acceptors from this connector at
// the server level, because the connector uses a dedicated executor.
continue; continue;
}
acceptors += abstractConnector.getAcceptors(); acceptors += abstractConnector.getAcceptors();
if (connector instanceof ServerConnector) if (connector instanceof ServerConnector)
selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount(); {
// The SelectorManager uses 2 threads for each selector,
// one for the normal and one for the low priority strategies.
selectors += 2 * ((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}
} }
int needed=1+selectors+acceptors; int needed=1+selectors+acceptors;
if (max>0 && needed>max) if (max>0 && needed>max)