updated EWYK strategy

This commit is contained in:
Greg Wilkins 2014-12-17 12:52:17 +01:00
parent a71cc6978a
commit 37e7e5217a
1 changed files with 160 additions and 77 deletions

View File

@ -20,14 +20,13 @@
package org.eclipse.jetty.util.thread; package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Strategies to execute Producers /** Strategies to execute Producers
*/ */
public abstract class ExecutionStrategy implements Runnable public abstract class ExecutionStrategy
{ {
public interface Producer public interface Producer
{ {
@ -36,18 +35,11 @@ public abstract class ExecutionStrategy implements Runnable
* @return A task to run or null if we are complete. * @return A task to run or null if we are complete.
*/ */
Runnable produce(); Runnable produce();
/**
* Check if there is more to produce. This method may not return valid
* results until {@link #produce()} has been called.
* @return true if this Producer may produce more tasks from {@link #produce()}
*/
boolean isMore();
/** /**
* Called to signal production is completed * Called to signal production is completed
*/ */
void onCompleted(); void onProductionComplete();
} }
protected final Producer _producer; protected final Producer _producer;
@ -59,6 +51,8 @@ public abstract class ExecutionStrategy implements Runnable
_executor=executor; _executor=executor;
} }
public abstract void produce();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Simple iterative strategy. /** Simple iterative strategy.
* Iterate over production until complete and execute each task. * Iterate over production until complete and execute each task.
@ -70,35 +64,26 @@ public abstract class ExecutionStrategy implements Runnable
super(producer,executor); super(producer,executor);
} }
public void run() public void produce()
{ {
try try
{ {
// Iterate until we are complete // Iterate until we are complete
loop: while (true) while (true)
{ {
// produce a task // produce a task
Runnable task=_producer.produce(); Runnable task=_producer.produce();
// if there is no task, break the loop
if (task==null) if (task==null)
break loop; break;
// If we are still producing, // execute the task
if (_producer.isMore()) _executor.execute(task);
// execute the task
_executor.execute(task);
else
{
// last task so we can run ourselves
task.run();
break loop;
}
} }
} }
finally finally
{ {
_producer.onCompleted(); _producer.onProductionComplete();
} }
} }
} }
@ -107,68 +92,166 @@ public abstract class ExecutionStrategy implements Runnable
/** /**
* A Strategy that allows threads to run the tasks that they have produced, * A Strategy that allows threads to run the tasks that they have produced,
* so execution is done with a hot cache (ie threads eat what they kill). * so execution is done with a hot cache (ie threads eat what they kill).
* <p>
* The phrase 'eat what you kill' comes from the hunting ethic that says a person
* shouldnt kill anything he or she doesnt plan on eating. It was taken up in its
* more general sense by lawyers, who used it to mean that an individuals earnings
* should be based on how much business that person brings to the firm and the phrase
* is now quite common throughout the business world. In this case, the phrase is
* used to mean that a thread should not produce a task that it does not intend
* to consume. By making producers consume the task that they have just generated
* avoids execution delays and avoids parallel slow down by doing the consumption with
* a hot cache. It also avoids the creation of a queue of produced events that the
* system does not yet have capacity to consume, which can save memory and exert back
* pressure on producers.
*/ */
public static class EatWhatYouKill extends ExecutionStrategy public static class EatWhatYouKill extends ExecutionStrategy implements Runnable
{ {
private final AtomicInteger _threads = new AtomicInteger(0); private enum State {IDLE,PRODUCING,PENDING,PRODUCING_PENDING};
private final AtomicReference<Boolean> _producing = new AtomicReference<Boolean>(Boolean.FALSE); private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
private volatile boolean _dispatched;
@Override
public void run()
{
// A new thread has arrived, so clear pending
// and try to set producing.
if (!clearPendingTryProducing())
return;
while (true)
{
// If we got here, then we are the thread that is producing
Runnable task=_producer.produce();
// If no task was produced
if (task==null)
{
// If we are the thread that sets idle
if (tryIdle())
// signal that production has stopped
_producer.onProductionComplete();
return;
}
// We have finished producing, so clear producing and try to
// set pending
if (clearProducingTryPending())
_executor.execute(this);
// consume the task
task.run();
// Once we have consumed, we can try producing again
if (!tryProducing())
return;
}
}
private boolean tryProducing()
{
while(true)
{
State state=_state.get();
switch(state)
{
case PENDING:
if (!_state.compareAndSet(state,State.PRODUCING_PENDING))
continue;
return true;
default:
return false;
}
}
}
private boolean clearProducingTryPending()
{
while(true)
{
State state=_state.get();
switch(state)
{
case PRODUCING:
if (!_state.compareAndSet(state,State.PENDING))
continue;
return true;
case PRODUCING_PENDING:
if (!_state.compareAndSet(state,State.PENDING))
continue;
return false;
default:
throw new IllegalStateException();
}
}
}
private boolean clearPendingTryProducing()
{
while(true)
{
State state=_state.get();
switch(state)
{
case IDLE:
return false;
case PENDING:
if (!_state.compareAndSet(state,State.PRODUCING))
continue;
return true;
case PRODUCING_PENDING:
if (!_state.compareAndSet(state,State.PRODUCING))
continue;
return false; // Another thread is already producing
case PRODUCING:
return false; // Another thread is already producing
}
}
}
private boolean tryIdle()
{
while(true)
{
State state=_state.get();
switch(state)
{
case PRODUCING:
case PRODUCING_PENDING:
if (!_state.compareAndSet(state,State.IDLE))
continue;
return true;
default:
return false;
}
}
}
public EatWhatYouKill(Producer producer, Executor executor) public EatWhatYouKill(Producer producer, Executor executor)
{ {
super(producer,executor); super(producer,executor);
} }
public void run() public void produce()
{ {
_dispatched=false; while(true)
// count the dispatched threads
_threads.incrementAndGet();
try
{ {
boolean complete=false; State state=_state.get();
loop: while (!complete) switch(state)
{ {
// If another thread is already producing, case IDLE:
if (!_producing.compareAndSet(false,true)) if (!_state.compareAndSet(state,State.PENDING))
// break the loop even if not complete continue;
break loop; run();
return;
// If we got here, then we are the thread that is producing default:
Runnable task=null; return;
try
{
task=_producer.produce();
complete=task==null || _producer.isMore();
}
finally
{
_producing.set(false);
}
// since we are going to eat the task we just "killed"
// then we may need another thread to keep producing
if (!complete && !_dispatched)
{
// Dispatch a thread to continue producing
_dispatched=true;
_executor.execute(this);
}
// If there is a task,
if (task!=null)
// run the task
task.run();
} }
}
finally
{
// If we were the last thread, signal completion
if (_threads.decrementAndGet()==0)
_producer.onCompleted();
} }
} }
} }
} }