From ca3af688096687c85ec80e3173380f7d1fe45117 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 3 Nov 2016 11:44:36 +0100 Subject: [PATCH] Code cleanup. Clarified with comments how the 2 execution strategies work in ManagedSelector. Fixed computation of the minimum number of threads in Server. --- .../org/eclipse/jetty/io/ManagedSelector.java | 183 +++++++++--------- .../java/org/eclipse/jetty/server/Server.java | 34 ++-- 2 files changed, 113 insertions(+), 104 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index c016b38d06e..ffeeaeccf28 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,86 +70,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable private final ExecutionStrategy _lowPriorityStrategy; private Selector _selector; - private final Runnable _runStrategy = new Runnable() - { - @Override - public void run() - { - _strategy.produce(); - } - }; - - private final Runnable _runLowPriorityStrategy = new Runnable() - { - @Override - public void run() - { - Thread current = Thread.currentThread(); - String name = current.getName(); - int priority = current.getPriority(); - try - { - while (isRunning()) - { - try - { - current.setPriority(Thread.MIN_PRIORITY); - current.setName(name+"-lowPrioSelector"); - _lowPriorityStrategy.produce(); - } - catch (Throwable th) - { - LOG.warn(th); - } - } - } - finally - { - current.setPriority(priority); - current.setName(name); - } - } - }; - public ManagedSelector(SelectorManager selectorManager, int id) { _selectorManager = selectorManager; _id = id; SelectorProducer producer = new SelectorProducer(); - _strategy = new ExecuteProduceConsume(producer, selectorManager.getExecutor(), Invocable.InvocationType.BLOCKING); - _lowPriorityStrategy = new ProduceExecuteConsume(producer, selectorManager.getExecutor(), Invocable.InvocationType.BLOCKING) - { - @Override - protected boolean execute(Runnable task) - { - try - { - Invocable.InvocationType invocation=Invocable.getInvocationType(task); - if (LOG.isDebugEnabled()) - LOG.debug("Low Prio Selector execute {} {}",invocation,task); - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING: - task.run(); - return true; - - case EITHER: - Invocable.invokeNonBlocking(task); - return true; - - default: - } - return super.execute(task); - } - finally - { - // Allow opportunity for main strategy to take over - Thread.yield(); - } - } - - - }; + Executor executor = selectorManager.getExecutor(); + _strategy = new ExecuteProduceConsume(producer, executor, Invocable.InvocationType.BLOCKING); + _lowPriorityStrategy = new LowPriorityProduceExecuteConsume(producer, executor); setStopTimeout(5000); } @@ -156,9 +85,38 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable protected void doStart() throws Exception { super.doStart(); + _selector = _selectorManager.newSelector(); - _selectorManager.execute(_runStrategy); - _selectorManager.execute(_runLowPriorityStrategy); + + // The producer used by the strategies will never + // be idle (either produces a task or blocks). + + // The normal strategy obtains the produced task, schedules + // a new thread to produce more, runs the task and then exits. + _selectorManager.execute(_strategy::produce); + + // The low priority strategy knows the producer will never + // be idle, that tasks are scheduled to run in different + // threads, therefore lowPriorityProduce() never exits. + _selectorManager.execute(this::lowPriorityProduce); + } + + private void lowPriorityProduce() + { + Thread current = Thread.currentThread(); + String name = current.getName(); + int priority = current.getPriority(); + current.setPriority(Thread.MIN_PRIORITY); + current.setName(name+"-lowPrioritySelector"); + try + { + _lowPriorityStrategy.produce(); + } + finally + { + current.setPriority(priority); + current.setName(name); + } } public int size() @@ -227,28 +185,75 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable void updateKey(); } + private static class LowPriorityProduceExecuteConsume extends ProduceExecuteConsume + { + private LowPriorityProduceExecuteConsume(SelectorProducer producer, Executor executor) + { + super(producer, executor, InvocationType.BLOCKING); + } + + @Override + protected boolean execute(Runnable task) + { + try + { + InvocationType invocation=Invocable.getInvocationType(task); + if (LOG.isDebugEnabled()) + LOG.debug("Low Priority Selector executing {} {}",invocation,task); + switch (invocation) + { + case NON_BLOCKING: + task.run(); + return true; + + case EITHER: + Invocable.invokeNonBlocking(task); + return true; + + default: + return super.execute(task); + } + } + finally + { + // Allow opportunity for main strategy to take over. + Thread.yield(); + } + } + } + private class SelectorProducer implements ExecutionStrategy.Producer { private Set _keys = Collections.emptySet(); private Iterator _cursor = Collections.emptyIterator(); @Override - public synchronized Runnable produce() + public Runnable produce() { - while (true) + // This method is called from both the + // normal and low priority strategies. + // Only one can produce at a time, so it's synchronized + // to enforce that only one strategy actually produces. + // When idle in select(), this method blocks; + // the other strategy's thread will be blocked + // waiting for this lock to be released. + synchronized (this) { - Runnable task = processSelected(); - if (task != null) - return task; + while (true) + { + Runnable task = processSelected(); + if (task != null) + return task; - Runnable action = nextAction(); - if (action != null) - return action; + Runnable action = nextAction(); + if (action != null) + return action; - update(); + update(); - if (!select()) - return null; + if (!select()) + return null; + } } } @@ -492,7 +497,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable public void destroyEndPoint(final EndPoint endPoint) { final Connection connection = endPoint.getConnection(); - submit((Runnable)() -> + submit(() -> { if (LOG.isDebugEnabled()) LOG.debug("Destroyed {}", endPoint); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index ec78352bffd..c386e940099 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -378,8 +378,7 @@ public class Server extends HandlerWrapper implements Attributes HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); - - // check size of thread pool + // Check that the thread pool size is enough. SizedThreadPool pool = getBean(SizedThreadPool.class); int max=pool==null?-1:pool.getMaxThreads(); int selectors=0; @@ -387,24 +386,29 @@ public class Server extends HandlerWrapper implements Attributes for (Connector connector : _connectors) { - if (!(connector instanceof AbstractConnector)) - continue; + if (connector instanceof AbstractConnector) + { + AbstractConnector abstractConnector = (AbstractConnector)connector; + Executor connectorExecutor = connector.getExecutor(); - AbstractConnector abstractConnector = (AbstractConnector) connector; - Executor connectorExecutor = connector.getExecutor(); + if (connectorExecutor != pool) + { + // Do not count the selectors and acceptors from this connector at + // the server level, because the connector uses a dedicated executor. + continue; + } - if (connectorExecutor != pool) - // Do not count the selectors and acceptors from this connector at server level, because connector uses dedicated executor. - continue; - - acceptors += abstractConnector.getAcceptors(); - - if (connector instanceof ServerConnector) - selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount(); + acceptors += abstractConnector.getAcceptors(); + if (connector instanceof ServerConnector) + { + // The SelectorManager uses 2 threads for each selector, + // one for the normal and one for the low priority strategies. + selectors += 2 * ((ServerConnector)connector).getSelectorManager().getSelectorCount(); + } + } } - int needed=1+selectors+acceptors; if (max>0 && needed>max) throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));