diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 5010a631854..ace1729763e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -22,6 +22,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.Condition; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -32,6 +34,7 @@ import org.eclipse.jetty.util.log.Logger; * with a Thread immediately being assigned the Runnable task, or fail if no Thread is * available. Threads are preallocated up to the capacity from a wrapped {@link Executor}. */ +@ManagedObject("A pool for reserved threads") public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor { private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); @@ -79,19 +82,30 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { return _executor; } - + + @ManagedAttribute(value = "max number of reserved threads", readonly = true) public int getCapacity() { return _queue.length; } - - public int getPreallocated() + + @ManagedAttribute(value = "available reserved threads", readonly = true) + public int getAvailable() { try (Locker.Lock lock = _locker.lock()) { return _size; } } + + @ManagedAttribute(value = "pending reserved threads", readonly = true) + public int getPending() + { + try (Locker.Lock lock = _locker.lock()) + { + return _pending; + } + } @Override public void doStart() throws Exception diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 58c38c07bbf..079e9e2684d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -21,7 +21,11 @@ package org.eclipse.jetty.util.thread.strategy; import java.io.Closeable; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.LongAdder; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -57,6 +61,7 @@ import org.eclipse.jetty.util.thread.ReservedThreadExecutor; * sub-strategy is called ProduceExecuteConsume (PEC). *

*/ +@ManagedObject("eat what you kill execution strategy") public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable { private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); @@ -64,19 +69,22 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat private enum State { IDLE, PRODUCING, REPRODUCING } private final Locker _locker = new Locker(); - private State _state = State.IDLE; + private final LongAdder _nonBlocking = new LongAdder(); + private final LongAdder _blocking = new LongAdder(); + private final LongAdder _executed = new LongAdder(); private final Producer _producer; private final Executor _executor; private final ReservedThreadExecutor _producers; + private State _state = State.IDLE; public EatWhatYouKill(Producer producer, Executor executor) { this(producer,executor,new ReservedThreadExecutor(executor,1)); } - public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending) + public EatWhatYouKill(Producer producer, Executor executor, int maxReserved) { - this(producer,executor,new ReservedThreadExecutor(executor,maxProducersPending)); + this(producer,executor,new ReservedThreadExecutor(executor,maxReserved)); } public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers) @@ -184,7 +192,9 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { // Could another one just have been queued with a produce call? if (_state==State.REPRODUCING) + { _state = State.PRODUCING; + } else { if (LOG.isDebugEnabled()) @@ -194,36 +204,42 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat } } } - else if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING) - { - // PRODUCE CONSUME (EWYK!) - if (LOG.isDebugEnabled()) - LOG.debug("{} PC t={}",this,task); - task.run(); - } else { boolean consume; - try (Lock locked = _locker.lock()) + if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING) { - if (_producers.tryExecute(this)) + // PRODUCE CONSUME (EWYK!) + if (LOG.isDebugEnabled()) + LOG.debug("{} PC t={}", this, task); + consume = true; + _nonBlocking.increment(); + } + else + { + try (Lock locked = _locker.lock()) { - // EXECUTE PRODUCE CONSUME! - // We have executed a new Producer, so we can EWYK consume - _state = State.IDLE; - producing = false; - consume = true; - } - else - { - // PRODUCE EXECUTE CONSUME! - consume = false; + if (_producers.tryExecute(this)) + { + // EXECUTE PRODUCE CONSUME! + // We have executed a new Producer, so we can EWYK consume + _state = State.IDLE; + producing = false; + consume = true; + _blocking.increment(); + } + else + { + // PRODUCE EXECUTE CONSUME! + consume = false; + _executed.increment(); + } } + + if (LOG.isDebugEnabled()) + LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task); } - if (LOG.isDebugEnabled()) - LOG.debug("{} {} t={}",this,consume?"EPC":"PEC",task); - // Consume or execute task try { @@ -232,7 +248,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat else _executor.execute(task); } - catch(RejectedExecutionException e) + catch (RejectedExecutionException e) { LOG.warn(e); if (task instanceof Closeable) @@ -241,13 +257,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { ((Closeable)task).close(); } - catch(Throwable e2) + catch (Throwable e2) { LOG.ignore(e2); } } } - catch(Throwable e) + catch (Throwable e) { LOG.warn(e); } @@ -257,7 +273,26 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat return producing; } - public Boolean isIdle() + @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true) + public long getNonBlockingTasksConsumed() + { + return _nonBlocking.longValue(); + } + + @ManagedAttribute(value = "number of blocking tasks consumed", readonly = true) + public long getBlockingTasksConsumed() + { + return _blocking.longValue(); + } + + @ManagedAttribute(value = "number of blocking tasks executed", readonly = true) + public long getBlockingTasksExecuted() + { + return _executed.longValue(); + } + + @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) + public boolean isIdle() { try (Lock locked = _locker.lock()) { @@ -265,6 +300,14 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat } } + @ManagedOperation(value = "resets the task counts", impact = "ACTION") + public void reset() + { + _nonBlocking.reset(); + _blocking.reset(); + _executed.reset(); + } + public String toString() { try (Lock locked = _locker.lock()) diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java index f6209683a59..83702750ec5 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -45,7 +45,6 @@ public class ReservedThreadExecutorTest _pae.start(); } - @After public void after() throws Exception { @@ -62,13 +61,13 @@ public class ReservedThreadExecutorTest assertThat(_pae.getCapacity(),is(SIZE)); long started = System.nanoTime(); - while (_pae.getPreallocated()10) break; Thread.sleep(100); } - assertThat(_pae.getPreallocated(),is(SIZE)); + assertThat(_pae.getAvailable(),is(SIZE)); } @Test @@ -84,28 +83,28 @@ public class ReservedThreadExecutorTest _executor.execute(); long started = System.nanoTime(); - while (_pae.getPreallocated()10) break; Thread.sleep(100); } assertThat(_executor._queue.size(),is(0)); - assertThat(_pae.getPreallocated(),is(SIZE)); + assertThat(_pae.getAvailable(),is(SIZE)); for (int i=SIZE;i-->0;) assertThat(_pae.tryExecute(new Task()),is(true)); assertThat(_executor._queue.size(),is(1)); - assertThat(_pae.getPreallocated(),is(0)); + assertThat(_pae.getAvailable(),is(0)); for (int i=SIZE;i-->0;) assertThat(_pae.tryExecute(new NOOP()),is(false)); assertThat(_executor._queue.size(),is(SIZE)); - assertThat(_pae.getPreallocated(),is(0)); + assertThat(_pae.getAvailable(),is(0)); assertThat(_pae.tryExecute(new NOOP()),is(false)); assertThat(_executor._queue.size(),is(SIZE)); - assertThat(_pae.getPreallocated(),is(0)); + assertThat(_pae.getAvailable(),is(0)); } @Test @@ -114,13 +113,13 @@ public class ReservedThreadExecutorTest while(!_executor._queue.isEmpty()) _executor.execute(); long started = System.nanoTime(); - while (_pae.getPreallocated()10) break; Thread.sleep(100); } - assertThat(_pae.getPreallocated(),is(SIZE)); + assertThat(_pae.getAvailable(),is(SIZE)); Task[] task = new Task[SIZE]; for (int i=SIZE;i-->0;) @@ -147,18 +146,15 @@ public class ReservedThreadExecutorTest } started = System.nanoTime(); - while (_pae.getPreallocated()10) break; Thread.sleep(100); } - assertThat(_pae.getPreallocated(),is(SIZE)); - - + assertThat(_pae.getAvailable(),is(SIZE)); } - - + private static class TestExecutor implements Executor { Deque _queue = new ArrayDeque<>();