Cleaned up ExecutionStrategy by adding javadocs, removing methods not

used and renaming classes and methods to better names.
This commit is contained in:
Simone Bordet 2014-12-19 11:25:09 +01:00
parent 393bfa76f0
commit 448f150ac4
3 changed files with 77 additions and 83 deletions

View File

@ -51,7 +51,7 @@ public class HTTP2Connection extends AbstractConnection
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.executionStrategy = new ExecutionStrategy.EatWhatYouKill(new HTTP2Producer(), executor);
this.executionStrategy = new ExecutionStrategy.ExecuteProduceRun(new HTTP2Producer(), executor);
}
protected ISession getSession()
@ -79,7 +79,7 @@ public class HTTP2Connection extends AbstractConnection
@Override
public void onFillable()
{
executionStrategy.produce();
executionStrategy.execute();
}
private int fill(EndPoint endPoint, ByteBuffer buffer)
@ -177,10 +177,5 @@ public class HTTP2Connection extends AbstractConnection
buffer = null;
}
}
@Override
public void onProductionComplete()
{
}
}
}

View File

@ -68,7 +68,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
public ManagedSelector(SelectorManager selectorManager, int id)
{
_selectorManager = selectorManager;
_strategy = new ExecutionStrategy.EatWhatYouKill(this, selectorManager.getExecutor());
_strategy = new ExecutionStrategy.ExecuteProduceRun(this, selectorManager.getExecutor());
_id = id;
setStopTimeout(5000);
}
@ -171,7 +171,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
public void run()
{
while (isRunning() || isStopping())
_strategy.produce();
_strategy.execute();
}
@ -350,11 +350,6 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
_selections = _selectedKeys.iterator();
}
@Override
public void onProductionComplete()
{
}
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();

View File

@ -22,97 +22,99 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
/**
* Strategies to execute Producers
* <p>An {@link ExecutionStrategy} executes {@link Runnable} tasks produced by a {@link Producer}.
* The strategy to execute the task may vary depending on the implementation; the task may be
* run in the calling thread, or in a new thread, etc.</p>
* <p>The strategy delegates the production of tasks to a {@link Producer}, and continues to
* execute tasks until the producer continues to produce them.</p>
*/
public abstract class ExecutionStrategy
public interface ExecutionStrategy
{
/**
* Initiates (or resumes) the task production and execution.
*/
public void execute();
/**
* <p>A producer of {@link Runnable} tasks to run.</p>
* <p>The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until
* the producer returns null, indicating that it has nothing more to produce.</p>
* <p>When no more tasks can be produced, implementations should arrange for the
* {@link ExecutionStrategy} to be invoked again in case an external event resumes
* the tasks production.</p>
*/
public interface Producer
{
/**
* Produce a task to run
* <p>Produces a task to be executed.</p>
*
* @return A task to run or null if we are complete.
* @return a task to executed or null if there are no more tasks to execute
*/
Runnable produce();
/**
* Called to signal production is completed
*/
void onProductionComplete();
}
protected final Producer _producer;
protected final Executor _executor;
protected ExecutionStrategy(Producer producer, Executor executor)
{
_producer = producer;
_executor = executor;
}
public abstract void produce();
/**
* Simple iterative strategy.
* Iterate over production until complete and execute each task.
* <p>A strategy where the caller thread iterates over task production, submitting each
* task to an {@link Executor} for execution.</p>
*/
public static class Iterative extends ExecutionStrategy
public static class ProduceExecuteRun implements ExecutionStrategy
{
public Iterative(Producer producer, Executor executor)
private final Producer _producer;
private final Executor _executor;
public ProduceExecuteRun(Producer producer, Executor executor)
{
super(producer, executor);
this._producer = producer;
this._executor = executor;
}
public void produce()
@Override
public void execute()
{
try
// Iterate until we are complete.
while (true)
{
// Iterate until we are complete
while (true)
{
// produce a task
Runnable task = _producer.produce();
// Produce a task.
Runnable task = _producer.produce();
if (task == null)
break;
if (task == null)
break;
// execute the task
_executor.execute(task);
}
}
finally
{
_producer.onProductionComplete();
// Execute the task.
_executor.execute(task);
}
}
}
/**
* A Strategy that allows threads to run the tasks that they have produced,
* so execution is done with a hot cache (ie threads eat what they kill).
* <p/>
* The phrase 'eat what you kill' comes from the hunting ethic that says a person
* shouldnt kill anything he or she doesnt plan on eating. It was taken up in its
* <p>A strategy where the caller thread produces a task, then arranges another
* thread to continue production, and then runs the task.</p>
* <p>The phrase 'eat what you kill' comes from the hunting ethic that says a person
* should not kill anything he or she does not plan on eating. It was taken up in its
* more general sense by lawyers, who used it to mean that an individuals earnings
* should be based on how much business that person brings to the firm and the phrase
* is now quite common throughout the business world. In this case, the phrase is
* used to mean that a thread should not produce a task that it does not intend
* to consume. By making producers consume the task that they have just generated
* avoids execution delays and avoids parallel slow down by doing the consumption with
* a hot cache. It also avoids the creation of a queue of produced events that the
* system does not yet have capacity to consume, which can save memory and exert back
* pressure on producers.
* is now quite common throughout the business world.</p>
* <p>In this case, the phrase is used to mean that a thread should not produce a
* task that it does not intend to run. By making producers run the task that they
* have just produced avoids execution delays and avoids parallel slow down by running
* the task in the same core, with good chances of having a hot CPU cache. It also
* avoids the creation of a queue of produced tasks that the system does not yet have
* capacity to consume, which can save memory and exert back pressure on producers.</p>
*/
public static class EatWhatYouKill extends ExecutionStrategy implements Runnable
public static class ExecuteProduceRun implements ExecutionStrategy, Runnable
{
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
private final Producer _producer;
private final Executor _executor;
public EatWhatYouKill(Producer producer, Executor executor)
public ExecuteProduceRun(Producer producer, Executor executor)
{
super(producer, executor);
this._producer = producer;
this._executor = executor;
}
public void produce()
@Override
public void execute()
{
while (true)
{
@ -134,35 +136,37 @@ public abstract class ExecutionStrategy
@Override
public void run()
{
// A new thread has arrived, so clear pending
// and try to set producing.
// A new thread has arrived, so clear the PENDING
// flag and try to set the PRODUCING flag.
if (!clearPendingTryProducing())
return;
while (true)
{
// If we got here, then we are the thread that is producing
// If we got here, then we are the thread that is producing.
Runnable task = _producer.produce();
// If no task was produced
// If no task was produced...
if (task == null)
{
// If we are the thread that sets idle
if (tryIdle())
// signal that production has stopped
_producer.onProductionComplete();
// ...and we are the thread that sets the IDLE flag,
// then production has stopped.
tryIdle();
return;
}
// We have finished producing, so clear producing and try to
// set pending
// We have produced, so clear the PRODUCING flag
// and try to set the PENDING flag.
if (clearProducingTryPending())
{
// Spawn a new thread to continue production.
_executor.execute(this);
}
// consume the task
// Run the task.
task.run();
// Once we have consumed, we can try producing again
// Once we have run the task, we can try producing again.
if (!tryProducing())
return;
}