EWYK cleanups from #1970

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2017-12-14 23:08:08 +01:00 committed by Simone Bordet
parent 2c0f695386
commit 0c021f2599
1 changed files with 52 additions and 47 deletions

View File

@ -19,6 +19,8 @@
package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;
@ -32,8 +34,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
/**
@ -66,9 +66,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
private enum State { IDLE, PRODUCING, REPRODUCING }
private enum State { IDLE, PENDING, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker();
private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder();
private final LongAdder _executed = new LongAdder();
@ -101,12 +100,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
public void dispatch()
{
boolean execute = false;
try (Lock locked = _locker.lock())
synchronized(this)
{
switch(_state)
{
case IDLE:
execute = true;
_state = State.PENDING;
break;
case PRODUCING:
@ -136,19 +136,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce", this);
boolean reproduce = true;
while(isRunning() && tryProduce(reproduce) && doProduce())
reproduce = false;
if (tryProduce())
doProduce();
}
public boolean tryProduce(boolean reproduce)
private boolean tryProduce()
{
boolean producing = false;
try (Lock locked = _locker.lock())
synchronized(this)
{
switch (_state)
{
case IDLE:
case PENDING:
// Enter PRODUCING
_state = State.PRODUCING;
producing = true;
@ -156,7 +156,6 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
case PRODUCING:
// Keep other Thread producing
if (reproduce)
_state = State.REPRODUCING;
break;
@ -167,7 +166,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
return producing;
}
public boolean doProduce()
private void doProduce()
{
boolean producing = true;
while (isRunning() && producing)
@ -183,24 +182,22 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
LOG.warn(e);
}
if (LOG.isDebugEnabled())
LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task));
if (task==null)
{
try (Lock locked = _locker.lock())
synchronized(this)
{
// Could another one just have been queued with a produce call?
if (_state==State.REPRODUCING)
// Could another task just have been queued with a produce call?
switch (_state)
{
_state = State.PRODUCING;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE",toStringLocked());
case PRODUCING:
_state = State.IDLE;
producing = false;
break;
case REPRODUCING:
_state = State.PRODUCING;
break;
default:
throw new IllegalStateException(toStringLocked());
}
}
}
@ -209,21 +206,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
boolean consume;
if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING)
{
// PRODUCE CONSUME (EWYK!)
if (LOG.isDebugEnabled())
LOG.debug("{} PC t={}", this, task);
// PRODUCE CONSUME
consume = true;
_nonBlocking.increment();
}
else
{
try (Lock locked = _locker.lock())
synchronized(this)
{
if (_producers.tryExecute(this))
{
// EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume
_state = State.IDLE;
_state = State.PENDING;
producing = false;
consume = true;
_blocking.increment();
@ -235,10 +230,10 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
_executed.increment();
}
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task);
}
LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task));
// Consume or execute task
try
@ -250,7 +245,10 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
}
catch (RejectedExecutionException e)
{
if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
if (task instanceof Closeable)
{
try
@ -269,8 +267,6 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
}
}
}
return producing;
}
@ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
@ -294,7 +290,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
@ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
public boolean isIdle()
{
try (Lock locked = _locker.lock())
synchronized(this)
{
return _state==State.IDLE;
}
@ -310,7 +306,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
public String toString()
{
try (Lock locked = _locker.lock())
synchronized(this)
{
return toStringLocked();
}
@ -339,5 +335,14 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
builder.append(_state);
builder.append('/');
builder.append(_producers);
builder.append("[nb=");
builder.append(getNonBlockingTasksConsumed());
builder.append(",c=");
builder.append(getBlockingTasksConsumed());
builder.append(",e=");
builder.append(getBlockingTasksExecuted());
builder.append("]");
builder.append("@");
builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
}
}