diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 6a15415790e..0346f4c10e5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -58,7 +58,6 @@ import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; public class ManagedSelector extends ContainerLifeCycle implements Dumpable { private static final Logger LOG = Log.getLogger(ManagedSelector.class); - private static final long MAX_ACTION_PERIOD_MS = Long.getLong("org.eclipse.jetty.io.ManagedSelector.MAX_ACTION_PERIOD_MS",100); private final Locker _locker = new Locker(); private boolean _selecting = false; @@ -67,7 +66,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable private final int _id; private final ExecutionStrategy _strategy; private Selector _selector; - private long _actionTime = -1; + private int _actionCount; public ManagedSelector(SelectorManager selectorManager, int id) { @@ -75,7 +74,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable _id = id; SelectorProducer producer = new SelectorProducer(); Executor executor = selectorManager.getExecutor(); - _strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class)); + _strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class)); addBean(_strategy,true); setStopTimeout(5000); } @@ -219,6 +218,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable submit(new DestroyEndPoint(endPoint)); } + private int getActionSize() + { + try (Locker.Lock lock = _locker.lock()) + { + return _actions.size(); + } + } + @Override public String dump() { @@ -249,11 +256,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable public String toString() { Selector selector = _selector; - return String.format("%s id=%s keys=%d selected=%d", + return String.format("%s id=%s keys=%d selected=%d actions=%d", super.toString(), _id, selector != null && selector.isOpen() ? selector.keys().size() : -1, - selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); + selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1, + getActionSize()); } /** @@ -304,7 +312,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable private Runnable nextAction() { - long now = System.nanoTime(); Selector selector = null; Runnable action = null; try (Locker.Lock lock = _locker.lock()) @@ -312,38 +319,48 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable // It is important to avoid live-lock (busy blocking) here. If too many actions // are submitted, this can indefinitely defer selection happening. Similarly if // we give too much priority to selection, it may prevent actions from being run. - // The solution implemented here is to put a maximum time limit on handling actions - // so that this method will fall through to selection if more than MAX_ACTION_PERIOD_MS - // is spent running actions. The time period is cleared whenever a selection occurs, - // so that a full period can be spent on actions after every select. - - if (_actionTime == -1) - { - _actionTime = now; + // The solution implemented here is to only process the number of actions that were + // originally in the action queue before attempting a select + + if (_actionCount==0) + { + // Calculate how many actions we are prepared to handle before selection + _actionCount = _actions.size(); + if (_actionCount>0) + action = _actions.poll(); + else + _selecting = true; } - else if ((now - _actionTime) > TimeUnit.MILLISECONDS.toNanos(MAX_ACTION_PERIOD_MS) && _actions.size() > 0) + else if (_actionCount==1) { - // Too much time spent handling actions, give selection a go, - // immediately waking up (as if remaining action were just added). - selector = _selector; - _selecting = false; - _actionTime = -1; + _actionCount = 0; + if (LOG.isDebugEnabled()) LOG.debug("Forcing selection, actions={}",_actions.size()); - } - - if (selector == null) - { - action = _actions.poll(); - if (action == null) + + if (_actions.size()==0) { - // No more actions, so we time to do some selecting + // This was the last action, so select normally _selecting = true; - _actionTime = -1; } + else + { + // there are still more actions to handle, so + // immediately wake up (as if remaining action were just added). + selector = _selector; + _selecting = false; + } + } + else + { + _actionCount--; + action = _actions.poll(); } } + if (LOG.isDebugEnabled()) + LOG.debug("action={} wakeup={}",action,selector!=null); + if (selector != null) selector.wakeup(); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentStack.java b/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentStack.java deleted file mode 100644 index a25be9306ac..00000000000 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentStack.java +++ /dev/null @@ -1,91 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * ConcurrentStack - * - * Nonblocking stack using variation of Treiber's algorithm - * that allows for reduced garbage - */ -public class ConcurrentStack -{ - private final NodeStack stack = new NodeStack<>(); - - public void push(I item) - { - stack.push(new Holder(item)); - } - - public I pop() - { - Holder holder = stack.pop(); - if (holder==null) - return null; - return holder.item; - } - - private static class Holder extends Node - { - final I item; - - Holder(I item) - { - this.item = item; - } - } - - public static class Node - { - Node next; - } - - public static class NodeStack - { - AtomicReference stack = new AtomicReference(); - - public void push(N node) - { - while(true) - { - Node top = stack.get(); - node.next = top; - if (stack.compareAndSet(top,node)) - break; - } - } - - public N pop() - { - while (true) - { - Node top = stack.get(); - if (top==null) - return null; - if (stack.compareAndSet(top,top.next)) - { - top.next = null; - return (N)top; - } - } - } - } -} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 8a46d037e18..66d19e0ac9b 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -18,13 +18,13 @@ package org.eclipse.jetty.util.thread; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; -import org.eclipse.jetty.util.ConcurrentStack; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -49,7 +49,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { @Override public void run() - {} + { + } @Override public String toString() @@ -60,10 +61,9 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo private final Executor _executor; private final int _capacity; - private final ConcurrentStack.NodeStack _stack; + private final ConcurrentLinkedDeque _stack; private final AtomicInteger _size = new AtomicInteger(); private final AtomicInteger _pending = new AtomicInteger(); - private final AtomicInteger _waiting = new AtomicInteger(); private ThreadPoolBudget.Lease _lease; private Object _owner; @@ -96,7 +96,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { _executor = executor; _capacity = reservedThreads(executor,capacity); - _stack = new ConcurrentStack.NodeStack<>(); + _stack = new ConcurrentLinkedDeque<>(); _owner = owner; LOG.debug("{}",this); @@ -145,12 +145,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo return _pending.get(); } - @ManagedAttribute(value = "waiting reserved threads", readonly = true) - public int getWaiting() - { - return _waiting.get(); - } - @ManagedAttribute(value = "idletimeout in MS", readonly = true) public long getIdleTimeoutMs() { @@ -186,17 +180,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo _lease.close(); while(true) { - ReservedThread thread = _stack.pop(); - if (thread==null) - { - super.doStop(); - return; - } - + ReservedThread thread = _stack.pollFirst(); + if (thread == null) + break; _size.decrementAndGet(); - thread.stop(); } + super.doStop(); } @Override @@ -218,7 +208,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo if (task==null) return false; - ReservedThread thread = _stack.pop(); + ReservedThread thread = _stack.pollFirst(); if (thread==null) { if (task!=STOP) @@ -263,16 +253,16 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo @Override public String toString() { - return String.format("%s@%s{s=%d/%d,p=%d,w=%d}", + return String.format("%s@%x{s=%d/%d,p=%d}@%s", getClass().getSimpleName(), - _owner != null ? _owner : Integer.toHexString(hashCode()), + hashCode(), _size.get(), _capacity, _pending.get(), - _waiting.get()); + _owner); } - private class ReservedThread extends ConcurrentStack.Node implements Runnable + private class ReservedThread implements Runnable { private final Locker _locker = new Locker(); private final Condition _wakeup = _locker.newCondition(); @@ -302,7 +292,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo LOG.debug("{} waiting", this); Runnable task = null; - while (isRunning() && task==null) + while (task==null) { boolean idle = false; @@ -312,7 +302,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { try { - _waiting.incrementAndGet(); if (_idleTime == 0) _wakeup.await(); else @@ -322,10 +311,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { LOG.ignore(e); } - finally - { - _waiting.decrementAndGet(); - } } task = _task; _task = null; @@ -347,7 +332,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo if (LOG.isDebugEnabled()) LOG.debug("{} task={}", this, task); - return task==null?STOP:task; + return task; } @Override @@ -355,8 +340,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { while (isRunning()) { - Runnable task = null; - // test and increment size BEFORE decrementing pending, // so that we don't have a race starting new pending. while(true) @@ -384,10 +367,10 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo // Insert ourselves in the stack. Size is already incremented, but // that only effects the decision to keep other threads reserved. - _stack.push(this); + _stack.offerFirst(this); // Wait for a task - task = reservedWait(); + Runnable task = reservedWait(); if (task==STOP) // return on STOP poison pill diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 079e9e2684d..db26786c0e4 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -37,25 +37,25 @@ import org.eclipse.jetty.util.thread.Locker.Lock; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; /** - *

A strategy where the thread that produces will run the resulting task if it + *

A strategy where the thread that produces will run the resulting task if it * is possible to do so without thread starvation.

- * + * *

This strategy preemptively dispatches a thread as a pending producer, so that * when a thread produces a task it can immediately run the task and let the pending * producer thread take over producing. If necessary another thread will be dispatched - * to replace the pending producing thread. When operating in this pattern, the + * to replace the pending producing thread. When operating in this pattern, the * sub-strategy is called Execute Produce Consume (EPC) *

- *

However, if the task produced uses the {@link Invocable} API to indicate that - * it will not block, then the strategy will run it directly, regardless of the - * presence of a pending producing thread and then resume producing after the + *

However, if the task produced uses the {@link Invocable} API to indicate that + * it will not block, then the strategy will run it directly, regardless of the + * presence of a pending producing thread and then resume producing after the * task has completed. This sub-strategy is also used if the strategy has been * configured with a maximum of 0 pending threads and the thread currently producing * does not use the {@link Invocable} API to indicate that it will not block. * When operating in this pattern, the sub-strategy is called * ProduceConsume (PC). *

- *

If there is no pending producer thread available and if the task has not + *

If there is no pending producer thread available and if the task has not * indicated it is non-blocking, then this strategy will dispatch the execution of * the task and immediately continue producing. When operating in this pattern, the * sub-strategy is called ProduceExecuteConsume (PEC). @@ -67,7 +67,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); private enum State { IDLE, PRODUCING, REPRODUCING } - + private final Locker _locker = new Locker(); private final LongAdder _nonBlocking = new LongAdder(); private final LongAdder _blocking = new LongAdder(); @@ -86,7 +86,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { this(producer,executor,new ReservedThreadExecutor(executor,maxReserved)); } - + public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers) { _producer = producer; @@ -108,11 +108,11 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat case IDLE: execute = true; break; - + case PRODUCING: _state = State.REPRODUCING; break; - + default: break; } @@ -153,13 +153,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat _state = State.PRODUCING; producing = true; break; - + case PRODUCING: // Keep other Thread producing if (reproduce) _state = State.REPRODUCING; break; - + default: break; } @@ -170,7 +170,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat public boolean doProduce() { boolean producing = true; - while (isRunning() && producing) + while (isRunning() && producing) { // If we got here, then we are the thread that is producing. Runnable task = null; @@ -269,7 +269,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat } } } - + return producing; } @@ -323,7 +323,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat getState(builder); return builder.toString(); } - + private void getString(StringBuilder builder) { builder.append(getClass().getSimpleName());