Fixes #546 (Guard concurrent calls to ExecutionStrategy.execute())

Guarded invocations to execute() with a state change.
This commit is contained in:
Simone Bordet 2016-05-04 11:43:50 +02:00
parent 49a19abd71
commit a0bd9d3887
2 changed files with 45 additions and 3 deletions

View File

@ -34,6 +34,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
private final Producer _producer; private final Producer _producer;
private final Executor _executor; private final Executor _executor;
private State _state = State.IDLE;
public ProduceConsume(Producer producer, Executor executor) public ProduceConsume(Producer producer, Executor executor)
{ {
@ -44,16 +45,31 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
@Override @Override
public void execute() public void execute()
{ {
synchronized (this)
{
_state = _state == State.IDLE ? State.PRODUCE : State.EXECUTE;
if (_state == State.EXECUTE)
return;
}
// Iterate until we are complete. // Iterate until we are complete.
while (true) while (true)
{ {
// Produce a task. // Produce a task.
Runnable task = _producer.produce(); Runnable task = _producer.produce();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", this, task); LOG.debug("{} produced {}", _producer, task);
if (task == null) if (task == null)
break; {
synchronized (this)
{
_state = _state == State.PRODUCE ? State.IDLE : State.PRODUCE;
if (_state == State.PRODUCE)
continue;
return;
}
}
// Run the task. // Run the task.
task.run(); task.run();
@ -80,4 +96,9 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
return new ProduceConsume(producer, executor); return new ProduceConsume(producer, executor);
} }
} }
private enum State
{
IDLE, PRODUCE, EXECUTE
}
} }

View File

@ -35,6 +35,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
private final Producer _producer; private final Producer _producer;
private final Executor _executor; private final Executor _executor;
private State _state = State.IDLE;
public ProduceExecuteConsume(Producer producer, Executor executor) public ProduceExecuteConsume(Producer producer, Executor executor)
{ {
@ -45,6 +46,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy
@Override @Override
public void execute() public void execute()
{ {
synchronized (this)
{
_state = _state == State.IDLE ? State.PRODUCE : State.EXECUTE;
if (_state == State.EXECUTE)
return;
}
// Iterate until we are complete. // Iterate until we are complete.
while (true) while (true)
{ {
@ -54,7 +62,15 @@ public class ProduceExecuteConsume implements ExecutionStrategy
LOG.debug("{} produced {}", _producer, task); LOG.debug("{} produced {}", _producer, task);
if (task == null) if (task == null)
break; {
synchronized (this)
{
_state = _state == State.PRODUCE ? State.IDLE : State.PRODUCE;
if (_state == State.PRODUCE)
continue;
return;
}
}
// Execute the task. // Execute the task.
try try
@ -94,4 +110,9 @@ public class ProduceExecuteConsume implements ExecutionStrategy
return new ProduceExecuteConsume(producer, executor); return new ProduceExecuteConsume(producer, executor);
} }
} }
private enum State
{
IDLE, PRODUCE, EXECUTE
}
} }