diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index d808f7f8fe2..f463f3ac0a5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -48,6 +48,7 @@ import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; @@ -57,7 +58,7 @@ import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated * with the channel.

*/ -public class ManagedSelector extends AbstractLifeCycle implements Dumpable +public class ManagedSelector extends ContainerLifeCycle implements Dumpable { private static final Logger LOG = Log.getLogger(ManagedSelector.class); @@ -67,7 +68,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable private final SelectorManager _selectorManager; private final int _id; private final ExecutionStrategy _strategy; - private final ExecutionStrategy _lowPriorityStrategy; private Selector _selector; public ManagedSelector(SelectorManager selectorManager, int id) @@ -76,8 +76,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable _id = id; SelectorProducer producer = new SelectorProducer(); Executor executor = selectorManager.getExecutor(); - _strategy = new ExecuteProduceConsume(producer, executor, Invocable.InvocationType.BLOCKING); - _lowPriorityStrategy = new LowPriorityProduceExecuteConsume(producer, executor); + _strategy = new EatWhatYouKill(producer, executor, Invocable.InvocationType.BLOCKING, Invocable.InvocationType.NON_BLOCKING); + addBean(_strategy); setStopTimeout(5000); } @@ -94,29 +94,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable // The normal strategy obtains the produced task, schedules // a new thread to produce more, runs the task and then exits. _selectorManager.execute(_strategy::produce); - - // The low priority strategy knows the producer will never - // be idle, that tasks are scheduled to run in different - // threads, therefore lowPriorityProduce() never exits. - _selectorManager.execute(this::lowPriorityProduce); - } - - private void lowPriorityProduce() - { - Thread current = Thread.currentThread(); - String name = current.getName(); - int priority = current.getPriority(); - current.setPriority(Thread.MIN_PRIORITY); - current.setName(name+"-lowPrioritySelector"); - try - { - _lowPriorityStrategy.produce(); - } - finally - { - current.setPriority(priority); - current.setName(name); - } } public int size() @@ -135,11 +112,12 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable CloseEndPoints close_endps = new CloseEndPoints(); submit(close_endps); close_endps.await(getStopTimeout()); - super.doStop(); CloseSelector close_selector = new CloseSelector(); submit(close_selector); close_selector.await(getStopTimeout()); + super.doStop(); + if (LOG.isDebugEnabled()) LOG.debug("Stopped {}", this); } @@ -185,42 +163,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable void updateKey(); } - private static class LowPriorityProduceExecuteConsume extends ProduceExecuteConsume - { - private LowPriorityProduceExecuteConsume(SelectorProducer producer, Executor executor) - { - super(producer, executor, InvocationType.BLOCKING); - } - - @Override - protected boolean execute(Runnable task) - { - try - { - InvocationType invocation=Invocable.getInvocationType(task); - if (LOG.isDebugEnabled()) - LOG.debug("Low Priority Selector executing {} {}",invocation,task); - switch (invocation) - { - case NON_BLOCKING: - task.run(); - return true; - - case EITHER: - Invocable.invokeNonBlocking(task); - return true; - - default: - return super.execute(task); - } - } - finally - { - // Allow opportunity for main strategy to take over. - Thread.yield(); - } - } - } private class SelectorProducer implements ExecutionStrategy.Producer { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index 9875f8c0823..d8d3978db63 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -18,7 +18,13 @@ package org.eclipse.jetty.util.thread; +import java.io.Closeable; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; /** *

A task (typically either a {@link Runnable} or {@link Callable} @@ -179,4 +185,73 @@ public interface Invocable { return InvocationType.BLOCKING; } + + /** + * An Executor wrapper that knows about Invocable + * + */ + public static class InvocableExecutor implements Executor + { + private static final Logger LOG = Log.getLogger(InvocableExecutor.class); + + private final Executor _executor; + private final InvocationType _preferredInvocationType; + + public InvocableExecutor(Executor executor,InvocationType preferred) + { + _executor=executor; + _preferredInvocationType=preferred; + } + + public Invocable.InvocationType getPreferredInvocationType() + { + return _preferredInvocationType; + } + + public void invoke(Runnable task) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} invoke {}", this, task); + Invocable.invokePreferred(task,_preferredInvocationType); + if (LOG.isDebugEnabled()) + LOG.debug("{} invoked {}", this, task); + } + + public void execute(Runnable task) + { + tryExecute(task,_preferredInvocationType); + } + + public void execute(Runnable task, InvocationType preferred) + { + tryExecute(task,preferred); + } + + public boolean tryExecute(Runnable task, InvocationType preferred) + { + try + { + _executor.execute(Invocable.asPreferred(task,preferred)); + return true; + } + catch(RejectedExecutionException e) + { + // If we cannot execute, then close the task + LOG.debug(e); + LOG.warn("Rejected execution of {}",task); + try + { + if (task instanceof Closeable) + ((Closeable)task).close(); + } + catch (Exception x) + { + e.addSuppressed(x); + LOG.warn(e); + } + } + return false; + } + + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java new file mode 100644 index 00000000000..ff9595f7b3d --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -0,0 +1,345 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.strategy; + +import java.util.concurrent.Executor; +import java.util.concurrent.locks.Condition; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor; +import org.eclipse.jetty.util.thread.Invocable.InvocationType; +import org.eclipse.jetty.util.thread.Locker; +import org.eclipse.jetty.util.thread.Locker.Lock; + +/** + *

A strategy where the thread that produces will always run the resulting task.

+ *

The strategy may then dispatch another thread to continue production.

+ *

The strategy is also known by the nickname 'eat what you kill', which comes from + * the hunting ethic that says a person should not kill anything he or she does not + * plan on eating. In this case, the phrase is used to mean that a thread should + * not produce a task that it does not intend to run. By making producers run the + * task that they have just produced avoids execution delays and avoids parallel slow + * down by running the task in the same core, with good chances of having a hot CPU + * cache. It also avoids the creation of a queue of produced tasks that the system + * does not yet have capacity to consume, which can save memory and exert back + * pressure on producers.

+ */ +public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrategy, Runnable +{ + private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); + + enum State { IDLE, PRODUCING, REPRODUCING }; + + private final Locker _locker = new Locker(); + private State _state = State.IDLE; + private final Runnable _runProduce = new RunProduce(); + private final Producer _producer; + private final InvocableExecutor _executor; + private final InvocationType _executeType; + private int _pendingProducersMax; + private int _pendingProducers; + private int _pendingProducersDispatched; + private Condition _produce = _locker.newCondition(); + + public EatWhatYouKill(Producer producer, Executor executor) + { + this(producer,executor,InvocationType.BLOCKING,InvocationType.NON_BLOCKING); + } + + public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocation, InvocationType preferredExecution) + { + this(producer,executor,preferredInvocation,preferredExecution,1); + } + + public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocation, InvocationType preferredExecution, int maxProducersPending ) + { + _producer = producer; + _pendingProducersMax = maxProducersPending; + _executor = new InvocableExecutor(executor,preferredInvocation); + _executeType = preferredExecution; + } + + @Override + public void produce() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} execute", this); + + boolean produce; + try (Lock locked = _locker.lock()) + { + switch(_state) + { + case IDLE: + _state = State.PRODUCING; + produce = true; + break; + + case PRODUCING: + _state = State.REPRODUCING; + produce = false; + break; + + default: + produce = false; + } + } + + if (produce) + produceConsume(); + } + + @Override + public void dispatch() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} spawning", this); + boolean dispatch = false; + try (Lock locked = _locker.lock()) + { + switch(_state) + { + case IDLE: + dispatch = true; + break; + + case PRODUCING: + _state = State.REPRODUCING; + dispatch = false; + break; + + default: + dispatch = false; + } + } + if (dispatch) + _executor.execute(_runProduce); + } + + @Override + public void run() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} run", this); + if (!isRunning()) + return; + boolean producing; + try (Lock locked = _locker.lock()) + { + _pendingProducersDispatched--; + producing = pendingProducerWait(); + } + + if (producing) + produceConsume(); + } + + private boolean pendingProducerWait() + { + if (_pendingProducers<_pendingProducersMax) + { + try + { + _pendingProducers++; + _produce.await(); + if (_state == State.IDLE) + { + _state = State.PRODUCING; + return true; + } + } + catch (InterruptedException e) + { + LOG.debug(e); + // probably spurious, but we are not pending anymore + _pendingProducers--; + } + } + return false; + } + + private void produceConsume() + { + if (LOG.isDebugEnabled()) + LOG.debug("{} produce enter", this); + + producing: while (isRunning()) + { + // If we got here, then we are the thread that is producing. + if (LOG.isDebugEnabled()) + LOG.debug("{} producing", this); + + Runnable task = _producer.produce(); + + if (LOG.isDebugEnabled()) + LOG.debug("{} produced {}", this, task); + + boolean may_block_caller = !Invocable.isNonBlockingInvocation(); + boolean dispatch_new_producer = false; + boolean eat_it; + boolean keep_producing; + + try (Lock locked = _locker.lock()) + { + // Did we produced a task? + if (task == null) + { + // There is no task. + // Could another one just have been queued with a produce call? + if (_state==State.REPRODUCING) + { + _state = State.PRODUCING; + continue producing; + } + + // ... and no additional calls to execute, so we are idle + _state = State.IDLE; + keep_producing = false; + break producing; + } + + // Will we eat our own kill? + if (may_block_caller && Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING) + { + eat_it = true; + keep_producing = true; + } + else if (_pendingProducers==0) + { + keep_producing = true; + eat_it = false; + if ((_pendingProducersDispatched + _pendingProducers)<_pendingProducersMax) + { + _pendingProducersDispatched++; + dispatch_new_producer = true; + } + } + else + { + eat_it = true; + keep_producing = false; + dispatch_new_producer = true; + _pendingProducersDispatched++; + _state = State.IDLE; + _pendingProducers--; + _produce.signal(); + } + } + if (LOG.isDebugEnabled()) + LOG.debug("{} mbc={} dnp={} ei={} kp={}", this,may_block_caller,dispatch_new_producer,eat_it,keep_producing); + + // Run or execute the task. + if (task != null) + {; + if (eat_it) + _executor.invoke(task); + else + _executor.execute(task,_executeType); + } + + // If we need more producers + if (dispatch_new_producer) + { + // Spawn a new thread to continue production by running the produce loop. + _executor.execute(this); + } + + // Once we have run the task, we can try producing again. + if (keep_producing) + continue producing; + + try (Lock locked = _locker.lock()) + { + switch(_state) + { + case IDLE: + _state = State.PRODUCING; + continue producing; + + default: + // Perhaps we can be a pending Producer? + if (pendingProducerWait()) + continue producing; + } + } + + break producing; + } + if (LOG.isDebugEnabled()) + LOG.debug("{} produce exit",this); + } + + public Boolean isIdle() + { + try (Lock locked = _locker.lock()) + { + return _state==State.IDLE; + } + } + + @Override + protected void doStop() throws Exception + { + try (Lock locked = _locker.lock()) + { + _pendingProducers=0; + _produce.signalAll(); + } + } + + public String toString() + { + StringBuilder builder = new StringBuilder(); + builder.append(super.toString()); + builder.append('/'); + try (Lock locked = _locker.lock()) + { + builder.append(_state); + builder.append('/'); + builder.append(_pendingProducers); + builder.append('/'); + builder.append(_pendingProducersMax); + builder.append('/'); + } + builder.append(_producer); + return builder.toString(); + } + + private class RunProduce implements Runnable + { + @Override + public void run() + { + produce(); + } + } + + public static class Factory implements ExecutionStrategy.Factory + { + @Override + public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor) + { + return new EatWhatYouKill(producer, executor); + } + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java index 70a4f352fb0..3e5aea9cd1b 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java @@ -24,6 +24,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor; import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker.Lock; @@ -41,13 +42,14 @@ import org.eclipse.jetty.util.thread.Locker.Lock; * does not yet have capacity to consume, which can save memory and exert back * pressure on producers.

*/ -public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable +public class ExecuteProduceConsume implements ExecutionStrategy, Runnable { private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class); private final Locker _locker = new Locker(); private final Runnable _runProduce = new RunProduce(); private final Producer _producer; + private final InvocableExecutor _executor; private boolean _idle = true; private boolean _execute; private boolean _producing; @@ -61,10 +63,10 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements public ExecuteProduceConsume(Producer producer, Executor executor, InvocationType preferred ) { - super(executor,preferred); this._producer = producer; + _executor = new InvocableExecutor(executor,preferred); } - + @Override public void produce() { @@ -111,7 +113,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements _execute = true; } if (dispatch) - execute(_runProduce); + _executor.execute(_runProduce); } @Override @@ -190,15 +192,14 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements // Spawn a new thread to continue production by running the produce loop. if (LOG.isDebugEnabled()) LOG.debug("{} dispatch", this); - if (!execute(this)) - task = null; + _executor.execute(task); } // Run the task. if (LOG.isDebugEnabled()) LOG.debug("{} run {}", this, task); if (task != null) - invoke(task); + _executor.invoke(task); if (LOG.isDebugEnabled()) LOG.debug("{} ran {}", this, task); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java deleted file mode 100644 index 45b308d63be..00000000000 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java +++ /dev/null @@ -1,88 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util.thread.strategy; - -import java.io.Closeable; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; - -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.Invocable; - -/** - *

Base class for strategies that need to execute a task by submitting it to an {@link Executor}.

- *

If the submission to the {@code Executor} is rejected (via a {@link RejectedExecutionException}), - * the task is tested whether it implements {@link Closeable}; if it does, then {@link Closeable#close()} - * is called on the task object.

- */ -public abstract class ExecutingExecutionStrategy implements ExecutionStrategy -{ - private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class); - - private final Executor _executor; - private final Invocable.InvocationType _preferredInvocationType; - - protected ExecutingExecutionStrategy(Executor executor,Invocable.InvocationType preferred) - { - _executor=executor; - _preferredInvocationType=preferred; - } - - public Invocable.InvocationType getPreferredInvocationType() - { - return _preferredInvocationType; - } - - public void invoke(Runnable task) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} invoke {}", this, task); - Invocable.invokePreferred(task,_preferredInvocationType); - if (LOG.isDebugEnabled()) - LOG.debug("{} invoked {}", this, task); - } - - protected boolean execute(Runnable task) - { - try - { - _executor.execute(Invocable.asPreferred(task,_preferredInvocationType)); - return true; - } - catch(RejectedExecutionException e) - { - // If we cannot execute, then close the task and keep producing. - LOG.debug(e); - LOG.warn("Rejected execution of {}",task); - try - { - if (task instanceof Closeable) - ((Closeable)task).close(); - } - catch (Exception x) - { - e.addSuppressed(x); - LOG.warn(e); - } - } - return false; - } -} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java index e32095d705e..e8c92712a2a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java @@ -23,7 +23,8 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.Invocable.InvocationType; +import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker.Lock; @@ -31,23 +32,24 @@ import org.eclipse.jetty.util.thread.Locker.Lock; *

A strategy where the caller thread iterates over task production, submitting each * task to an {@link Executor} for execution.

*/ -public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements ExecutionStrategy +public class ProduceExecuteConsume implements ExecutionStrategy { private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class); private final Locker _locker = new Locker(); private final Producer _producer; + private final InvocableExecutor _executor; private State _state = State.IDLE; public ProduceExecuteConsume(Producer producer, Executor executor) { - this(producer,executor,Invocable.InvocationType.NON_BLOCKING); + this(producer,executor,InvocationType.NON_BLOCKING); } - public ProduceExecuteConsume(Producer producer, Executor executor, Invocable.InvocationType preferred) + public ProduceExecuteConsume(Producer producer, Executor executor, InvocationType preferred) { - super(executor,preferred); - this._producer = producer; + _producer = producer; + _executor = new InvocableExecutor(executor,preferred); } @Override @@ -95,7 +97,7 @@ public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements } // Execute the task. - execute(task); + _executor.execute(task); } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java new file mode 100644 index 00000000000..6881b19c119 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java @@ -0,0 +1,226 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.strategy; + + +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ExecutionStrategyTest +{ + @Parameterized.Parameters(name = "{0}") + public static Iterable data() + { + return Arrays.asList(new Object[][]{ + {ProduceExecuteConsume.class}, + {ExecuteProduceConsume.class}, + {EatWhatYouKill.class} + }); + } + + QueuedThreadPool threads = new QueuedThreadPool(20); + Class _strategyClass; + ExecutionStrategy _strategy; + + public ExecutionStrategyTest(Class strategy) + { + _strategyClass = strategy; + } + + void newExecutionStrategy(Producer producer, Executor executor) throws Exception + { + _strategy = _strategyClass.getConstructor(Producer.class,Executor.class).newInstance(producer,executor); + if (_strategy instanceof LifeCycle) + ((LifeCycle)_strategy).start(); + } + + @Before + public void before() throws Exception + { + threads.start(); + } + + @After + public void after() throws Exception + { + if (_strategy instanceof LifeCycle) + ((LifeCycle)_strategy).stop(); + threads.stop(); + } + + public static abstract class TestProducer implements Producer + { + @Override + public String toString() + { + return "TestProducer"; + } + } + + @Test + public void idleTest() throws Exception + { + AtomicInteger count = new AtomicInteger(0); + Producer producer = new TestProducer() + { + @Override + public Runnable produce() + { + count.incrementAndGet(); + return null; + } + }; + + newExecutionStrategy(producer,threads); + _strategy.produce(); + assertThat(count.get(),greaterThan(0)); + } + + @Test + public void simpleTest() throws Exception + { + final int TASKS = 3*threads.getMaxThreads(); + final CountDownLatch latch = new CountDownLatch(TASKS); + Producer producer = new TestProducer() + { + int tasks = TASKS; + @Override + public Runnable produce() + { + if (tasks-->0) + { + return new Runnable() + { + @Override + public void run() + { + latch.countDown(); + } + }; + } + + return null; + } + }; + + newExecutionStrategy(producer,threads); + + for (int p=0; latch.getCount()>0 && p q = new ArrayBlockingQueue<>(500); + + Producer producer = new TestProducer() + { + int tasks=TASKS; + @Override + public Runnable produce() + { + if (tasks-->0) + { + try + { + final CountDownLatch latch = q.poll(10,TimeUnit.SECONDS); + + if (latch!=null) + { + return new Runnable() + { + @Override + public void run() + { + latch.countDown(); + } + }; + } + } + catch(InterruptedException e) + { + e.printStackTrace(); + } + } + + return null; + } + }; + + newExecutionStrategy(producer,threads); + + final CountDownLatch latch = new CountDownLatch(TASKS); + threads.execute(new Runnable() + { + @Override + public void run() + { + try + { + for (int t=TASKS;t-->0;) + { + Thread.sleep(20); + q.offer(latch); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + }); + + + for (int p=0; latch.getCount()>0 && p