Experiment with lock free AdaptiveExecutionStrategy (#8762)

Removed the lock from the AdaptiveExecutionStrategy in favour of a BiInteger.
This commit is contained in:
Greg Wilkins 2022-11-02 08:16:24 +11:00 committed by GitHub
parent 4e32557616
commit 7e88b8234c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 124 additions and 62 deletions

View File

@ -20,13 +20,13 @@ import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.TryExecutor;
@ -98,12 +98,9 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
/**
* The production state of the strategy.
*/
private enum State
{
IDLE, // No tasks or producers.
PRODUCING, // There is an active producing thread.
REPRODUCING // There is an active producing thread and demand for more production.
}
static final int IDLE = 0; // No tasks or producers.
static final int PRODUCING = 1; // There is an active producing thread.
static final int REPRODUCING = 2; // There is an active producing thread and demand for more production.
/**
* The sub-strategies used by the strategy to consume tasks that are produced.
@ -128,7 +125,6 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
EXECUTE_PRODUCE_CONSUME
}
private final AutoLock _lock = new AutoLock();
private final LongAdder _pcMode = new LongAdder();
private final LongAdder _picMode = new LongAdder();
private final LongAdder _pecMode = new LongAdder();
@ -138,8 +134,7 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
private final TryExecutor _tryExecutor;
private final Runnable _runPendingProducer = () -> tryProduce(true);
private boolean _useVirtualThreads;
private State _state = State.IDLE;
private boolean _pending;
private final AtomicBiInteger _state = new AtomicBiInteger();
/**
* @param producer The producer of tasks to be consumed.
@ -167,26 +162,33 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
public void dispatch()
{
boolean execute = false;
try (AutoLock l = _lock.lock())
loop: while (true)
{
switch (_state)
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);
switch (state)
{
case IDLE:
if (!_pending)
if (pending <= 0)
{
_pending = true;
if (!_state.compareAndSet(biState, pending + 1, state))
continue;
execute = true;
}
break;
break loop;
case PRODUCING:
_state = State.REPRODUCING;
break;
if (!_state.compareAndSet(biState, pending, REPRODUCING))
continue;
break loop;
default:
break;
break loop;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch {}", this, execute);
if (execute)
@ -209,39 +211,47 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
if (LOG.isDebugEnabled())
LOG.debug("{} tryProduce {}", this, wasPending);
// Takes the lock to atomically check if the thread can produce.
try (AutoLock l = _lock.lock())
// check if the thread can produce.
loop: while (true)
{
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);
// If the calling thread was the pending producer, there is no longer one pending.
if (wasPending)
_pending = false;
pending--;
switch (_state)
switch (state)
{
case IDLE:
// The strategy was IDLE, so this thread can become the producer.
_state = State.PRODUCING;
break;
if (!_state.compareAndSet(biState, pending, PRODUCING))
continue;
break loop;
case PRODUCING:
// The strategy is already producing, so another thread must be the producer.
// However, it may be just about to stop being the producer so we set the
// REPRODUCING state to force it to produce at least once more.
_state = State.REPRODUCING;
if (!_state.compareAndSet(biState, pending, REPRODUCING))
continue;
return;
case REPRODUCING:
// Another thread is already producing and will already try again to produce.
if (!_state.compareAndSet(biState, pending, state))
continue;
return;
default:
throw new IllegalStateException(toStringLocked());
throw new IllegalStateException(toString(biState));
}
}
// Determine the thread's invocation type once, outside of the production loop.
boolean nonBlocking = Invocable.isNonBlockingInvocation();
while (isRunning())
running: while (isRunning())
{
try
{
@ -250,24 +260,30 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
// If we did not produce a task
if (task == null)
{
// take the lock to atomically determine if we should keep producing.
try (AutoLock l = _lock.lock())
// determine if we should keep producing.
while (true)
{
switch (_state)
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);
switch (state)
{
case PRODUCING:
// The calling thread was the only producer, so it is now IDLE and we stop producing.
_state = State.IDLE;
if (!_state.compareAndSet(biState, pending, IDLE))
continue;
return;
case REPRODUCING:
// Another thread may have queued a task and tried to produce
// so the calling thread should continue to produce.
_state = State.PRODUCING;
continue;
if (!_state.compareAndSet(biState, pending, PRODUCING))
continue;
continue running;
default:
throw new IllegalStateException(toStringLocked());
throw new IllegalStateException(toString(biState));
}
}
}
@ -303,53 +319,90 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
return SubStrategy.PRODUCE_CONSUME;
case EITHER:
{
// The produced task may be run either as blocking or non blocking.
// If the calling producing thread is already non-blocking, use PC.
if (nonBlocking)
return SubStrategy.PRODUCE_CONSUME;
// Take the lock to atomically check if a pending producer is available.
try (AutoLock l = _lock.lock())
// check if a pending producer is available.
int executed = 0;
while (true)
{
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);
// If a pending producer is available or one can be started
if (_pending || _tryExecutor.tryExecute(_runPendingProducer))
pending += executed;
if (pending <= 0 && _tryExecutor.tryExecute(_runPendingProducer))
{
executed++;
pending++;
}
if (pending > 0)
{
// Use EPC: the producer directly consumes the task, which may block
// and then races with the pending producer to resume production.
_pending = true;
_state = State.IDLE;
if (!_state.compareAndSet(biState, pending, IDLE))
continue;
return SubStrategy.EXECUTE_PRODUCE_CONSUME;
}
if (!_state.compareAndSet(biState, pending, state))
continue;
break;
}
// Otherwise use PIC: the producer consumes the task
// in non-blocking mode and then resumes production.
return SubStrategy.PRODUCE_INVOKE_CONSUME;
}
case BLOCKING:
{
// The produced task may block.
// If the calling producing thread may also block
if (!nonBlocking)
{
// Take the lock to atomically check if a pending producer is available.
try (AutoLock l = _lock.lock())
// check if a pending producer is available.
int executed = 0;
while (true)
{
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);
// If a pending producer is available or one can be started
if (_pending || _tryExecutor.tryExecute(_runPendingProducer))
pending += executed;
if (pending <= 0 && _tryExecutor.tryExecute(_runPendingProducer))
{
executed++;
pending++;
}
// If a pending producer is available or one can be started
if (pending > 0)
{
// use EPC: The producer directly consumes the task, which may block
// and then races with the pending producer to resume production.
_pending = true;
_state = State.IDLE;
if (!_state.compareAndSet(biState, pending, IDLE))
continue;
return SubStrategy.EXECUTE_PRODUCE_CONSUME;
}
if (!_state.compareAndSet(biState, pending, state))
continue;
break;
}
}
// Otherwise use PEC: the task is consumed by the executor and the producer continues to produce.
return SubStrategy.PRODUCE_EXECUTE_CONSUME;
}
default:
throw new IllegalStateException(String.format("taskType=%s %s", taskType, this));
@ -390,19 +443,25 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
runTask(task);
// Race the pending producer to produce again.
try (AutoLock l = _lock.lock())
while (true)
{
if (_state == State.IDLE)
long biState = _state.get();
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);
if (state == IDLE)
{
// We beat the pending producer, so we will become the producer instead.
// The pending produce will become a noop if it arrives whilst we are producing,
// or it may take over if we subsequently do another EPC consumption.
_state = State.PRODUCING;
if (!_state.compareAndSet(biState, pending, PRODUCING))
continue;
return true;
}
// The pending producer is now producing, so this thread no longer produces.
return false;
}
// The pending producer is now producing, so this thread no longer produces.
return false;
default:
throw new IllegalStateException(String.format("ss=%s %s", subStrategy, this));
@ -521,10 +580,7 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
@ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
public boolean isIdle()
{
try (AutoLock l = _lock.lock())
{
return _state == State.IDLE;
}
return _state.getLo() == IDLE;
}
@ManagedOperation(value = "resets the task counts", impact = "ACTION")
@ -539,17 +595,14 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
@Override
public String toString()
{
try (AutoLock l = _lock.lock())
{
return toStringLocked();
}
return toString(_state.get());
}
public String toStringLocked()
public String toString(long biState)
{
StringBuilder builder = new StringBuilder();
getString(builder);
getState(builder);
getState(builder, biState);
return builder.toString();
}
@ -563,11 +616,20 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
builder.append('/');
}
private void getState(StringBuilder builder)
private void getState(StringBuilder builder, long biState)
{
builder.append(_state);
int state = AtomicBiInteger.getLo(biState);
int pending = AtomicBiInteger.getHi(biState);
builder.append(
switch (state)
{
case IDLE -> "IDLE";
case PRODUCING -> "PRODUCING";
case REPRODUCING -> "REPRODUCING";
default -> "UNKNOWN(%d)".formatted(state);
});
builder.append("/p=");
builder.append(_pending);
builder.append(pending);
builder.append('/');
builder.append(_tryExecutor);
builder.append("[pc=");