Experiement enhancement to EatWhatYouKill ExecutionStrategy
Use the existence of a pending producer threads to determine if low resources or not.
This commit is contained in:
parent
998788d9fa
commit
c097db32e7
|
@ -48,6 +48,7 @@ import org.eclipse.jetty.util.thread.Invocable;
|
||||||
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
||||||
import org.eclipse.jetty.util.thread.Locker;
|
import org.eclipse.jetty.util.thread.Locker;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
|
||||||
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
|
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
|
||||||
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
|
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
|
||||||
|
|
||||||
|
@ -57,7 +58,7 @@ import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
|
||||||
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
|
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
|
||||||
* with the channel.</p>
|
* with the channel.</p>
|
||||||
*/
|
*/
|
||||||
public class ManagedSelector extends AbstractLifeCycle implements Dumpable
|
public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
|
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
|
||||||
|
|
||||||
|
@ -67,7 +68,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
|
||||||
private final SelectorManager _selectorManager;
|
private final SelectorManager _selectorManager;
|
||||||
private final int _id;
|
private final int _id;
|
||||||
private final ExecutionStrategy _strategy;
|
private final ExecutionStrategy _strategy;
|
||||||
private final ExecutionStrategy _lowPriorityStrategy;
|
|
||||||
private Selector _selector;
|
private Selector _selector;
|
||||||
|
|
||||||
public ManagedSelector(SelectorManager selectorManager, int id)
|
public ManagedSelector(SelectorManager selectorManager, int id)
|
||||||
|
@ -76,8 +76,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
|
||||||
_id = id;
|
_id = id;
|
||||||
SelectorProducer producer = new SelectorProducer();
|
SelectorProducer producer = new SelectorProducer();
|
||||||
Executor executor = selectorManager.getExecutor();
|
Executor executor = selectorManager.getExecutor();
|
||||||
_strategy = new ExecuteProduceConsume(producer, executor, Invocable.InvocationType.BLOCKING);
|
_strategy = new EatWhatYouKill(producer, executor, Invocable.InvocationType.BLOCKING, Invocable.InvocationType.NON_BLOCKING);
|
||||||
_lowPriorityStrategy = new LowPriorityProduceExecuteConsume(producer, executor);
|
addBean(_strategy);
|
||||||
setStopTimeout(5000);
|
setStopTimeout(5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,29 +94,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
|
||||||
// The normal strategy obtains the produced task, schedules
|
// The normal strategy obtains the produced task, schedules
|
||||||
// a new thread to produce more, runs the task and then exits.
|
// a new thread to produce more, runs the task and then exits.
|
||||||
_selectorManager.execute(_strategy::produce);
|
_selectorManager.execute(_strategy::produce);
|
||||||
|
|
||||||
// The low priority strategy knows the producer will never
|
|
||||||
// be idle, that tasks are scheduled to run in different
|
|
||||||
// threads, therefore lowPriorityProduce() never exits.
|
|
||||||
_selectorManager.execute(this::lowPriorityProduce);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void lowPriorityProduce()
|
|
||||||
{
|
|
||||||
Thread current = Thread.currentThread();
|
|
||||||
String name = current.getName();
|
|
||||||
int priority = current.getPriority();
|
|
||||||
current.setPriority(Thread.MIN_PRIORITY);
|
|
||||||
current.setName(name+"-lowPrioritySelector");
|
|
||||||
try
|
|
||||||
{
|
|
||||||
_lowPriorityStrategy.produce();
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
current.setPriority(priority);
|
|
||||||
current.setName(name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int size()
|
public int size()
|
||||||
|
@ -135,11 +112,12 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
|
||||||
CloseEndPoints close_endps = new CloseEndPoints();
|
CloseEndPoints close_endps = new CloseEndPoints();
|
||||||
submit(close_endps);
|
submit(close_endps);
|
||||||
close_endps.await(getStopTimeout());
|
close_endps.await(getStopTimeout());
|
||||||
super.doStop();
|
|
||||||
CloseSelector close_selector = new CloseSelector();
|
CloseSelector close_selector = new CloseSelector();
|
||||||
submit(close_selector);
|
submit(close_selector);
|
||||||
close_selector.await(getStopTimeout());
|
close_selector.await(getStopTimeout());
|
||||||
|
|
||||||
|
super.doStop();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Stopped {}", this);
|
LOG.debug("Stopped {}", this);
|
||||||
}
|
}
|
||||||
|
@ -185,42 +163,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Dumpable
|
||||||
void updateKey();
|
void updateKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LowPriorityProduceExecuteConsume extends ProduceExecuteConsume
|
|
||||||
{
|
|
||||||
private LowPriorityProduceExecuteConsume(SelectorProducer producer, Executor executor)
|
|
||||||
{
|
|
||||||
super(producer, executor, InvocationType.BLOCKING);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean execute(Runnable task)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
InvocationType invocation=Invocable.getInvocationType(task);
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("Low Priority Selector executing {} {}",invocation,task);
|
|
||||||
switch (invocation)
|
|
||||||
{
|
|
||||||
case NON_BLOCKING:
|
|
||||||
task.run();
|
|
||||||
return true;
|
|
||||||
|
|
||||||
case EITHER:
|
|
||||||
Invocable.invokeNonBlocking(task);
|
|
||||||
return true;
|
|
||||||
|
|
||||||
default:
|
|
||||||
return super.execute(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
// Allow opportunity for main strategy to take over.
|
|
||||||
Thread.yield();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class SelectorProducer implements ExecutionStrategy.Producer
|
private class SelectorProducer implements ExecutionStrategy.Producer
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,7 +18,13 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.util.thread;
|
package org.eclipse.jetty.util.thread;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>A task (typically either a {@link Runnable} or {@link Callable}
|
* <p>A task (typically either a {@link Runnable} or {@link Callable}
|
||||||
|
@ -179,4 +185,73 @@ public interface Invocable
|
||||||
{
|
{
|
||||||
return InvocationType.BLOCKING;
|
return InvocationType.BLOCKING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An Executor wrapper that knows about Invocable
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public static class InvocableExecutor implements Executor
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(InvocableExecutor.class);
|
||||||
|
|
||||||
|
private final Executor _executor;
|
||||||
|
private final InvocationType _preferredInvocationType;
|
||||||
|
|
||||||
|
public InvocableExecutor(Executor executor,InvocationType preferred)
|
||||||
|
{
|
||||||
|
_executor=executor;
|
||||||
|
_preferredInvocationType=preferred;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Invocable.InvocationType getPreferredInvocationType()
|
||||||
|
{
|
||||||
|
return _preferredInvocationType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void invoke(Runnable task)
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} invoke {}", this, task);
|
||||||
|
Invocable.invokePreferred(task,_preferredInvocationType);
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} invoked {}", this, task);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(Runnable task)
|
||||||
|
{
|
||||||
|
tryExecute(task,_preferredInvocationType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void execute(Runnable task, InvocationType preferred)
|
||||||
|
{
|
||||||
|
tryExecute(task,preferred);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean tryExecute(Runnable task, InvocationType preferred)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_executor.execute(Invocable.asPreferred(task,preferred));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch(RejectedExecutionException e)
|
||||||
|
{
|
||||||
|
// If we cannot execute, then close the task
|
||||||
|
LOG.debug(e);
|
||||||
|
LOG.warn("Rejected execution of {}",task);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (task instanceof Closeable)
|
||||||
|
((Closeable)task).close();
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
e.addSuppressed(x);
|
||||||
|
LOG.warn(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,345 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.util.thread.strategy;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||||
|
import org.eclipse.jetty.util.thread.Invocable;
|
||||||
|
import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
|
||||||
|
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
||||||
|
import org.eclipse.jetty.util.thread.Locker;
|
||||||
|
import org.eclipse.jetty.util.thread.Locker.Lock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>A strategy where the thread that produces will always run the resulting task.</p>
|
||||||
|
* <p>The strategy may then dispatch another thread to continue production.</p>
|
||||||
|
* <p>The strategy is also known by the nickname 'eat what you kill', which comes from
|
||||||
|
* the hunting ethic that says a person should not kill anything he or she does not
|
||||||
|
* plan on eating. In this case, the phrase is used to mean that a thread should
|
||||||
|
* not produce a task that it does not intend to run. By making producers run the
|
||||||
|
* task that they have just produced avoids execution delays and avoids parallel slow
|
||||||
|
* down by running the task in the same core, with good chances of having a hot CPU
|
||||||
|
* cache. It also avoids the creation of a queue of produced tasks that the system
|
||||||
|
* does not yet have capacity to consume, which can save memory and exert back
|
||||||
|
* pressure on producers.</p>
|
||||||
|
*/
|
||||||
|
public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrategy, Runnable
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
|
||||||
|
|
||||||
|
enum State { IDLE, PRODUCING, REPRODUCING };
|
||||||
|
|
||||||
|
private final Locker _locker = new Locker();
|
||||||
|
private State _state = State.IDLE;
|
||||||
|
private final Runnable _runProduce = new RunProduce();
|
||||||
|
private final Producer _producer;
|
||||||
|
private final InvocableExecutor _executor;
|
||||||
|
private final InvocationType _executeType;
|
||||||
|
private int _pendingProducersMax;
|
||||||
|
private int _pendingProducers;
|
||||||
|
private int _pendingProducersDispatched;
|
||||||
|
private Condition _produce = _locker.newCondition();
|
||||||
|
|
||||||
|
public EatWhatYouKill(Producer producer, Executor executor)
|
||||||
|
{
|
||||||
|
this(producer,executor,InvocationType.BLOCKING,InvocationType.NON_BLOCKING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocation, InvocationType preferredExecution)
|
||||||
|
{
|
||||||
|
this(producer,executor,preferredInvocation,preferredExecution,1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocation, InvocationType preferredExecution, int maxProducersPending )
|
||||||
|
{
|
||||||
|
_producer = producer;
|
||||||
|
_pendingProducersMax = maxProducersPending;
|
||||||
|
_executor = new InvocableExecutor(executor,preferredInvocation);
|
||||||
|
_executeType = preferredExecution;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void produce()
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} execute", this);
|
||||||
|
|
||||||
|
boolean produce;
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
switch(_state)
|
||||||
|
{
|
||||||
|
case IDLE:
|
||||||
|
_state = State.PRODUCING;
|
||||||
|
produce = true;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case PRODUCING:
|
||||||
|
_state = State.REPRODUCING;
|
||||||
|
produce = false;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
produce = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (produce)
|
||||||
|
produceConsume();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dispatch()
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} spawning", this);
|
||||||
|
boolean dispatch = false;
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
switch(_state)
|
||||||
|
{
|
||||||
|
case IDLE:
|
||||||
|
dispatch = true;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case PRODUCING:
|
||||||
|
_state = State.REPRODUCING;
|
||||||
|
dispatch = false;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
dispatch = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (dispatch)
|
||||||
|
_executor.execute(_runProduce);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} run", this);
|
||||||
|
if (!isRunning())
|
||||||
|
return;
|
||||||
|
boolean producing;
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
_pendingProducersDispatched--;
|
||||||
|
producing = pendingProducerWait();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (producing)
|
||||||
|
produceConsume();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean pendingProducerWait()
|
||||||
|
{
|
||||||
|
if (_pendingProducers<_pendingProducersMax)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_pendingProducers++;
|
||||||
|
_produce.await();
|
||||||
|
if (_state == State.IDLE)
|
||||||
|
{
|
||||||
|
_state = State.PRODUCING;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
LOG.debug(e);
|
||||||
|
// probably spurious, but we are not pending anymore
|
||||||
|
_pendingProducers--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void produceConsume()
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} produce enter", this);
|
||||||
|
|
||||||
|
producing: while (isRunning())
|
||||||
|
{
|
||||||
|
// If we got here, then we are the thread that is producing.
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} producing", this);
|
||||||
|
|
||||||
|
Runnable task = _producer.produce();
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} produced {}", this, task);
|
||||||
|
|
||||||
|
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
|
||||||
|
boolean dispatch_new_producer = false;
|
||||||
|
boolean eat_it;
|
||||||
|
boolean keep_producing;
|
||||||
|
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
// Did we produced a task?
|
||||||
|
if (task == null)
|
||||||
|
{
|
||||||
|
// There is no task.
|
||||||
|
// Could another one just have been queued with a produce call?
|
||||||
|
if (_state==State.REPRODUCING)
|
||||||
|
{
|
||||||
|
_state = State.PRODUCING;
|
||||||
|
continue producing;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ... and no additional calls to execute, so we are idle
|
||||||
|
_state = State.IDLE;
|
||||||
|
keep_producing = false;
|
||||||
|
break producing;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Will we eat our own kill?
|
||||||
|
if (may_block_caller && Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
|
||||||
|
{
|
||||||
|
eat_it = true;
|
||||||
|
keep_producing = true;
|
||||||
|
}
|
||||||
|
else if (_pendingProducers==0)
|
||||||
|
{
|
||||||
|
keep_producing = true;
|
||||||
|
eat_it = false;
|
||||||
|
if ((_pendingProducersDispatched + _pendingProducers)<_pendingProducersMax)
|
||||||
|
{
|
||||||
|
_pendingProducersDispatched++;
|
||||||
|
dispatch_new_producer = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
eat_it = true;
|
||||||
|
keep_producing = false;
|
||||||
|
dispatch_new_producer = true;
|
||||||
|
_pendingProducersDispatched++;
|
||||||
|
_state = State.IDLE;
|
||||||
|
_pendingProducers--;
|
||||||
|
_produce.signal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} mbc={} dnp={} ei={} kp={}", this,may_block_caller,dispatch_new_producer,eat_it,keep_producing);
|
||||||
|
|
||||||
|
// Run or execute the task.
|
||||||
|
if (task != null)
|
||||||
|
{;
|
||||||
|
if (eat_it)
|
||||||
|
_executor.invoke(task);
|
||||||
|
else
|
||||||
|
_executor.execute(task,_executeType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we need more producers
|
||||||
|
if (dispatch_new_producer)
|
||||||
|
{
|
||||||
|
// Spawn a new thread to continue production by running the produce loop.
|
||||||
|
_executor.execute(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Once we have run the task, we can try producing again.
|
||||||
|
if (keep_producing)
|
||||||
|
continue producing;
|
||||||
|
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
switch(_state)
|
||||||
|
{
|
||||||
|
case IDLE:
|
||||||
|
_state = State.PRODUCING;
|
||||||
|
continue producing;
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Perhaps we can be a pending Producer?
|
||||||
|
if (pendingProducerWait())
|
||||||
|
continue producing;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break producing;
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("{} produce exit",this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean isIdle()
|
||||||
|
{
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
return _state==State.IDLE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doStop() throws Exception
|
||||||
|
{
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
_pendingProducers=0;
|
||||||
|
_produce.signalAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
builder.append(super.toString());
|
||||||
|
builder.append('/');
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
builder.append(_state);
|
||||||
|
builder.append('/');
|
||||||
|
builder.append(_pendingProducers);
|
||||||
|
builder.append('/');
|
||||||
|
builder.append(_pendingProducersMax);
|
||||||
|
builder.append('/');
|
||||||
|
}
|
||||||
|
builder.append(_producer);
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class RunProduce implements Runnable
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
produce();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Factory implements ExecutionStrategy.Factory
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
|
||||||
|
{
|
||||||
|
return new EatWhatYouKill(producer, executor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||||
import org.eclipse.jetty.util.thread.Invocable;
|
import org.eclipse.jetty.util.thread.Invocable;
|
||||||
|
import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
|
||||||
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
||||||
import org.eclipse.jetty.util.thread.Locker;
|
import org.eclipse.jetty.util.thread.Locker;
|
||||||
import org.eclipse.jetty.util.thread.Locker.Lock;
|
import org.eclipse.jetty.util.thread.Locker.Lock;
|
||||||
|
@ -41,13 +42,14 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
|
||||||
* does not yet have capacity to consume, which can save memory and exert back
|
* does not yet have capacity to consume, which can save memory and exert back
|
||||||
* pressure on producers.</p>
|
* pressure on producers.</p>
|
||||||
*/
|
*/
|
||||||
public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable
|
public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
|
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
|
||||||
|
|
||||||
private final Locker _locker = new Locker();
|
private final Locker _locker = new Locker();
|
||||||
private final Runnable _runProduce = new RunProduce();
|
private final Runnable _runProduce = new RunProduce();
|
||||||
private final Producer _producer;
|
private final Producer _producer;
|
||||||
|
private final InvocableExecutor _executor;
|
||||||
private boolean _idle = true;
|
private boolean _idle = true;
|
||||||
private boolean _execute;
|
private boolean _execute;
|
||||||
private boolean _producing;
|
private boolean _producing;
|
||||||
|
@ -61,8 +63,8 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
|
||||||
|
|
||||||
public ExecuteProduceConsume(Producer producer, Executor executor, InvocationType preferred )
|
public ExecuteProduceConsume(Producer producer, Executor executor, InvocationType preferred )
|
||||||
{
|
{
|
||||||
super(executor,preferred);
|
|
||||||
this._producer = producer;
|
this._producer = producer;
|
||||||
|
_executor = new InvocableExecutor(executor,preferred);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -111,7 +113,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
|
||||||
_execute = true;
|
_execute = true;
|
||||||
}
|
}
|
||||||
if (dispatch)
|
if (dispatch)
|
||||||
execute(_runProduce);
|
_executor.execute(_runProduce);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -190,15 +192,14 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
|
||||||
// Spawn a new thread to continue production by running the produce loop.
|
// Spawn a new thread to continue production by running the produce loop.
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} dispatch", this);
|
LOG.debug("{} dispatch", this);
|
||||||
if (!execute(this))
|
_executor.execute(task);
|
||||||
task = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the task.
|
// Run the task.
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} run {}", this, task);
|
LOG.debug("{} run {}", this, task);
|
||||||
if (task != null)
|
if (task != null)
|
||||||
invoke(task);
|
_executor.invoke(task);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} ran {}", this, task);
|
LOG.debug("{} ran {}", this, task);
|
||||||
|
|
||||||
|
|
|
@ -1,88 +0,0 @@
|
||||||
//
|
|
||||||
// ========================================================================
|
|
||||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
|
||||||
// ------------------------------------------------------------------------
|
|
||||||
// All rights reserved. This program and the accompanying materials
|
|
||||||
// are made available under the terms of the Eclipse Public License v1.0
|
|
||||||
// and Apache License v2.0 which accompanies this distribution.
|
|
||||||
//
|
|
||||||
// The Eclipse Public License is available at
|
|
||||||
// http://www.eclipse.org/legal/epl-v10.html
|
|
||||||
//
|
|
||||||
// The Apache License v2.0 is available at
|
|
||||||
// http://www.opensource.org/licenses/apache2.0.php
|
|
||||||
//
|
|
||||||
// You may elect to redistribute this code under either of these licenses.
|
|
||||||
// ========================================================================
|
|
||||||
//
|
|
||||||
|
|
||||||
package org.eclipse.jetty.util.thread.strategy;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.util.log.Log;
|
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
|
||||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
|
||||||
import org.eclipse.jetty.util.thread.Invocable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>Base class for strategies that need to execute a task by submitting it to an {@link Executor}.</p>
|
|
||||||
* <p>If the submission to the {@code Executor} is rejected (via a {@link RejectedExecutionException}),
|
|
||||||
* the task is tested whether it implements {@link Closeable}; if it does, then {@link Closeable#close()}
|
|
||||||
* is called on the task object.</p>
|
|
||||||
*/
|
|
||||||
public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
|
|
||||||
{
|
|
||||||
private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class);
|
|
||||||
|
|
||||||
private final Executor _executor;
|
|
||||||
private final Invocable.InvocationType _preferredInvocationType;
|
|
||||||
|
|
||||||
protected ExecutingExecutionStrategy(Executor executor,Invocable.InvocationType preferred)
|
|
||||||
{
|
|
||||||
_executor=executor;
|
|
||||||
_preferredInvocationType=preferred;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Invocable.InvocationType getPreferredInvocationType()
|
|
||||||
{
|
|
||||||
return _preferredInvocationType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void invoke(Runnable task)
|
|
||||||
{
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("{} invoke {}", this, task);
|
|
||||||
Invocable.invokePreferred(task,_preferredInvocationType);
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("{} invoked {}", this, task);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean execute(Runnable task)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
_executor.execute(Invocable.asPreferred(task,_preferredInvocationType));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
catch(RejectedExecutionException e)
|
|
||||||
{
|
|
||||||
// If we cannot execute, then close the task and keep producing.
|
|
||||||
LOG.debug(e);
|
|
||||||
LOG.warn("Rejected execution of {}",task);
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (task instanceof Closeable)
|
|
||||||
((Closeable)task).close();
|
|
||||||
}
|
|
||||||
catch (Exception x)
|
|
||||||
{
|
|
||||||
e.addSuppressed(x);
|
|
||||||
LOG.warn(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -23,7 +23,8 @@ import java.util.concurrent.Executor;
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||||
import org.eclipse.jetty.util.thread.Invocable;
|
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
|
||||||
|
import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
|
||||||
import org.eclipse.jetty.util.thread.Locker;
|
import org.eclipse.jetty.util.thread.Locker;
|
||||||
import org.eclipse.jetty.util.thread.Locker.Lock;
|
import org.eclipse.jetty.util.thread.Locker.Lock;
|
||||||
|
|
||||||
|
@ -31,23 +32,24 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
|
||||||
* <p>A strategy where the caller thread iterates over task production, submitting each
|
* <p>A strategy where the caller thread iterates over task production, submitting each
|
||||||
* task to an {@link Executor} for execution.</p>
|
* task to an {@link Executor} for execution.</p>
|
||||||
*/
|
*/
|
||||||
public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements ExecutionStrategy
|
public class ProduceExecuteConsume implements ExecutionStrategy
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
|
private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
|
||||||
|
|
||||||
private final Locker _locker = new Locker();
|
private final Locker _locker = new Locker();
|
||||||
private final Producer _producer;
|
private final Producer _producer;
|
||||||
|
private final InvocableExecutor _executor;
|
||||||
private State _state = State.IDLE;
|
private State _state = State.IDLE;
|
||||||
|
|
||||||
public ProduceExecuteConsume(Producer producer, Executor executor)
|
public ProduceExecuteConsume(Producer producer, Executor executor)
|
||||||
{
|
{
|
||||||
this(producer,executor,Invocable.InvocationType.NON_BLOCKING);
|
this(producer,executor,InvocationType.NON_BLOCKING);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ProduceExecuteConsume(Producer producer, Executor executor, Invocable.InvocationType preferred)
|
public ProduceExecuteConsume(Producer producer, Executor executor, InvocationType preferred)
|
||||||
{
|
{
|
||||||
super(executor,preferred);
|
_producer = producer;
|
||||||
this._producer = producer;
|
_executor = new InvocableExecutor(executor,preferred);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,7 +97,7 @@ public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the task.
|
// Execute the task.
|
||||||
execute(task);
|
_executor.execute(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,226 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.util.thread.strategy;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.component.LifeCycle;
|
||||||
|
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||||
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class ExecutionStrategyTest
|
||||||
|
{
|
||||||
|
@Parameterized.Parameters(name = "{0}")
|
||||||
|
public static Iterable<Object[]> data()
|
||||||
|
{
|
||||||
|
return Arrays.asList(new Object[][]{
|
||||||
|
{ProduceExecuteConsume.class},
|
||||||
|
{ExecuteProduceConsume.class},
|
||||||
|
{EatWhatYouKill.class}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
QueuedThreadPool threads = new QueuedThreadPool(20);
|
||||||
|
Class<? extends ExecutionStrategy> _strategyClass;
|
||||||
|
ExecutionStrategy _strategy;
|
||||||
|
|
||||||
|
public ExecutionStrategyTest(Class<? extends ExecutionStrategy> strategy)
|
||||||
|
{
|
||||||
|
_strategyClass = strategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
void newExecutionStrategy(Producer producer, Executor executor) throws Exception
|
||||||
|
{
|
||||||
|
_strategy = _strategyClass.getConstructor(Producer.class,Executor.class).newInstance(producer,executor);
|
||||||
|
if (_strategy instanceof LifeCycle)
|
||||||
|
((LifeCycle)_strategy).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception
|
||||||
|
{
|
||||||
|
threads.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() throws Exception
|
||||||
|
{
|
||||||
|
if (_strategy instanceof LifeCycle)
|
||||||
|
((LifeCycle)_strategy).stop();
|
||||||
|
threads.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static abstract class TestProducer implements Producer
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "TestProducer";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void idleTest() throws Exception
|
||||||
|
{
|
||||||
|
AtomicInteger count = new AtomicInteger(0);
|
||||||
|
Producer producer = new TestProducer()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Runnable produce()
|
||||||
|
{
|
||||||
|
count.incrementAndGet();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
newExecutionStrategy(producer,threads);
|
||||||
|
_strategy.produce();
|
||||||
|
assertThat(count.get(),greaterThan(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void simpleTest() throws Exception
|
||||||
|
{
|
||||||
|
final int TASKS = 3*threads.getMaxThreads();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(TASKS);
|
||||||
|
Producer producer = new TestProducer()
|
||||||
|
{
|
||||||
|
int tasks = TASKS;
|
||||||
|
@Override
|
||||||
|
public Runnable produce()
|
||||||
|
{
|
||||||
|
if (tasks-->0)
|
||||||
|
{
|
||||||
|
return new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
newExecutionStrategy(producer,threads);
|
||||||
|
|
||||||
|
for (int p=0; latch.getCount()>0 && p<TASKS; p++)
|
||||||
|
_strategy.produce();
|
||||||
|
|
||||||
|
assertTrue(latch.await(10,TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void blockingProducerTest() throws Exception
|
||||||
|
{
|
||||||
|
final int TASKS = 3*threads.getMaxThreads();
|
||||||
|
final BlockingQueue<CountDownLatch> q = new ArrayBlockingQueue<>(500);
|
||||||
|
|
||||||
|
Producer producer = new TestProducer()
|
||||||
|
{
|
||||||
|
int tasks=TASKS;
|
||||||
|
@Override
|
||||||
|
public Runnable produce()
|
||||||
|
{
|
||||||
|
if (tasks-->0)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
final CountDownLatch latch = q.poll(10,TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
if (latch!=null)
|
||||||
|
{
|
||||||
|
return new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch(InterruptedException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
newExecutionStrategy(producer,threads);
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(TASKS);
|
||||||
|
threads.execute(new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (int t=TASKS;t-->0;)
|
||||||
|
{
|
||||||
|
Thread.sleep(20);
|
||||||
|
q.offer(latch);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch(Exception e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
for (int p=0; latch.getCount()>0 && p<TASKS; p++)
|
||||||
|
{
|
||||||
|
_strategy.produce();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(latch.await(10,TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -3,3 +3,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||||
#org.eclipse.jetty.util.LEVEL=DEBUG
|
#org.eclipse.jetty.util.LEVEL=DEBUG
|
||||||
|
|
||||||
#org.eclipse.jetty.util.PathWatcher.Noisy.LEVEL=OFF
|
#org.eclipse.jetty.util.PathWatcher.Noisy.LEVEL=OFF
|
||||||
|
|
||||||
|
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.LEVEL=DEBUG
|
Loading…
Reference in New Issue