Fixes #1798 - JMXify EatWhatYouKill.

This commit is contained in:
Simone Bordet 2017-09-07 16:01:20 +02:00
parent 006dee439f
commit 2ff3a80800
3 changed files with 101 additions and 48 deletions

View File

@ -22,6 +22,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -32,6 +34,7 @@ import org.eclipse.jetty.util.log.Logger;
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is * with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available. Threads are preallocated up to the capacity from a wrapped {@link Executor}. * available. Threads are preallocated up to the capacity from a wrapped {@link Executor}.
*/ */
@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor
{ {
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
@ -80,12 +83,14 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
return _executor; return _executor;
} }
@ManagedAttribute(value = "max number of reserved threads", readonly = true)
public int getCapacity() public int getCapacity()
{ {
return _queue.length; return _queue.length;
} }
public int getPreallocated() @ManagedAttribute(value = "available reserved threads", readonly = true)
public int getAvailable()
{ {
try (Locker.Lock lock = _locker.lock()) try (Locker.Lock lock = _locker.lock())
{ {
@ -93,6 +98,15 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
} }
} }
@ManagedAttribute(value = "pending reserved threads", readonly = true)
public int getPending()
{
try (Locker.Lock lock = _locker.lock())
{
return _pending;
}
}
@Override @Override
public void doStart() throws Exception public void doStart() throws Exception
{ {

View File

@ -21,7 +21,11 @@ package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable; import java.io.Closeable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -57,6 +61,7 @@ import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
* sub-strategy is called ProduceExecuteConsume (PEC). * sub-strategy is called ProduceExecuteConsume (PEC).
* </p> * </p>
*/ */
@ManagedObject("eat what you kill execution strategy")
public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable
{ {
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
@ -64,19 +69,22 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
private enum State { IDLE, PRODUCING, REPRODUCING } private enum State { IDLE, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker(); private final Locker _locker = new Locker();
private State _state = State.IDLE; private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder();
private final LongAdder _executed = new LongAdder();
private final Producer _producer; private final Producer _producer;
private final Executor _executor; private final Executor _executor;
private final ReservedThreadExecutor _producers; private final ReservedThreadExecutor _producers;
private State _state = State.IDLE;
public EatWhatYouKill(Producer producer, Executor executor) public EatWhatYouKill(Producer producer, Executor executor)
{ {
this(producer,executor,new ReservedThreadExecutor(executor,1)); this(producer,executor,new ReservedThreadExecutor(executor,1));
} }
public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending) public EatWhatYouKill(Producer producer, Executor executor, int maxReserved)
{ {
this(producer,executor,new ReservedThreadExecutor(executor,maxProducersPending)); this(producer,executor,new ReservedThreadExecutor(executor,maxReserved));
} }
public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers) public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
@ -184,7 +192,9 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{ {
// Could another one just have been queued with a produce call? // Could another one just have been queued with a produce call?
if (_state==State.REPRODUCING) if (_state==State.REPRODUCING)
{
_state = State.PRODUCING; _state = State.PRODUCING;
}
else else
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -194,35 +204,41 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
} }
} }
} }
else if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
{
// PRODUCE CONSUME (EWYK!)
if (LOG.isDebugEnabled())
LOG.debug("{} PC t={}",this,task);
task.run();
}
else else
{ {
boolean consume; boolean consume;
try (Lock locked = _locker.lock()) if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING)
{ {
if (_producers.tryExecute(this)) // PRODUCE CONSUME (EWYK!)
{ if (LOG.isDebugEnabled())
// EXECUTE PRODUCE CONSUME! LOG.debug("{} PC t={}", this, task);
// We have executed a new Producer, so we can EWYK consume consume = true;
_state = State.IDLE; _nonBlocking.increment();
producing = false;
consume = true;
}
else
{
// PRODUCE EXECUTE CONSUME!
consume = false;
}
} }
else
{
try (Lock locked = _locker.lock())
{
if (_producers.tryExecute(this))
{
// EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume
_state = State.IDLE;
producing = false;
consume = true;
_blocking.increment();
}
else
{
// PRODUCE EXECUTE CONSUME!
consume = false;
_executed.increment();
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} {} t={}",this,consume?"EPC":"PEC",task); LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task);
}
// Consume or execute task // Consume or execute task
try try
@ -232,7 +248,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
else else
_executor.execute(task); _executor.execute(task);
} }
catch(RejectedExecutionException e) catch (RejectedExecutionException e)
{ {
LOG.warn(e); LOG.warn(e);
if (task instanceof Closeable) if (task instanceof Closeable)
@ -241,13 +257,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{ {
((Closeable)task).close(); ((Closeable)task).close();
} }
catch(Throwable e2) catch (Throwable e2)
{ {
LOG.ignore(e2); LOG.ignore(e2);
} }
} }
} }
catch(Throwable e) catch (Throwable e)
{ {
LOG.warn(e); LOG.warn(e);
} }
@ -257,7 +273,26 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
return producing; return producing;
} }
public Boolean isIdle() @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
public long getNonBlockingTasksConsumed()
{
return _nonBlocking.longValue();
}
@ManagedAttribute(value = "number of blocking tasks consumed", readonly = true)
public long getBlockingTasksConsumed()
{
return _blocking.longValue();
}
@ManagedAttribute(value = "number of blocking tasks executed", readonly = true)
public long getBlockingTasksExecuted()
{
return _executed.longValue();
}
@ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
public boolean isIdle()
{ {
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
@ -265,6 +300,14 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
} }
} }
@ManagedOperation(value = "resets the task counts", impact = "ACTION")
public void reset()
{
_nonBlocking.reset();
_blocking.reset();
_executed.reset();
}
public String toString() public String toString()
{ {
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())

View File

@ -45,7 +45,6 @@ public class ReservedThreadExecutorTest
_pae.start(); _pae.start();
} }
@After @After
public void after() throws Exception public void after() throws Exception
{ {
@ -62,13 +61,13 @@ public class ReservedThreadExecutorTest
assertThat(_pae.getCapacity(),is(SIZE)); assertThat(_pae.getCapacity(),is(SIZE));
long started = System.nanoTime(); long started = System.nanoTime();
while (_pae.getPreallocated()<SIZE) while (_pae.getAvailable()<SIZE)
{ {
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10) if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break; break;
Thread.sleep(100); Thread.sleep(100);
} }
assertThat(_pae.getPreallocated(),is(SIZE)); assertThat(_pae.getAvailable(),is(SIZE));
} }
@Test @Test
@ -84,28 +83,28 @@ public class ReservedThreadExecutorTest
_executor.execute(); _executor.execute();
long started = System.nanoTime(); long started = System.nanoTime();
while (_pae.getPreallocated()<SIZE) while (_pae.getAvailable()<SIZE)
{ {
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10) if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break; break;
Thread.sleep(100); Thread.sleep(100);
} }
assertThat(_executor._queue.size(),is(0)); assertThat(_executor._queue.size(),is(0));
assertThat(_pae.getPreallocated(),is(SIZE)); assertThat(_pae.getAvailable(),is(SIZE));
for (int i=SIZE;i-->0;) for (int i=SIZE;i-->0;)
assertThat(_pae.tryExecute(new Task()),is(true)); assertThat(_pae.tryExecute(new Task()),is(true));
assertThat(_executor._queue.size(),is(1)); assertThat(_executor._queue.size(),is(1));
assertThat(_pae.getPreallocated(),is(0)); assertThat(_pae.getAvailable(),is(0));
for (int i=SIZE;i-->0;) for (int i=SIZE;i-->0;)
assertThat(_pae.tryExecute(new NOOP()),is(false)); assertThat(_pae.tryExecute(new NOOP()),is(false));
assertThat(_executor._queue.size(),is(SIZE)); assertThat(_executor._queue.size(),is(SIZE));
assertThat(_pae.getPreallocated(),is(0)); assertThat(_pae.getAvailable(),is(0));
assertThat(_pae.tryExecute(new NOOP()),is(false)); assertThat(_pae.tryExecute(new NOOP()),is(false));
assertThat(_executor._queue.size(),is(SIZE)); assertThat(_executor._queue.size(),is(SIZE));
assertThat(_pae.getPreallocated(),is(0)); assertThat(_pae.getAvailable(),is(0));
} }
@Test @Test
@ -114,13 +113,13 @@ public class ReservedThreadExecutorTest
while(!_executor._queue.isEmpty()) while(!_executor._queue.isEmpty())
_executor.execute(); _executor.execute();
long started = System.nanoTime(); long started = System.nanoTime();
while (_pae.getPreallocated()<SIZE) while (_pae.getAvailable()<SIZE)
{ {
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10) if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break; break;
Thread.sleep(100); Thread.sleep(100);
} }
assertThat(_pae.getPreallocated(),is(SIZE)); assertThat(_pae.getAvailable(),is(SIZE));
Task[] task = new Task[SIZE]; Task[] task = new Task[SIZE];
for (int i=SIZE;i-->0;) for (int i=SIZE;i-->0;)
@ -147,18 +146,15 @@ public class ReservedThreadExecutorTest
} }
started = System.nanoTime(); started = System.nanoTime();
while (_pae.getPreallocated()<SIZE) while (_pae.getAvailable()<SIZE)
{ {
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10) if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break; break;
Thread.sleep(100); Thread.sleep(100);
} }
assertThat(_pae.getPreallocated(),is(SIZE)); assertThat(_pae.getAvailable(),is(SIZE));
} }
private static class TestExecutor implements Executor private static class TestExecutor implements Executor
{ {
Deque<Runnable> _queue = new ArrayDeque<>(); Deque<Runnable> _queue = new ArrayDeque<>();