From 2ff3a80800e30e404765221a55081ec2f7c4b3e8 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Thu, 7 Sep 2017 16:01:20 +0200
Subject: [PATCH] Fixes #1798 - JMXify EatWhatYouKill.
---
.../util/thread/ReservedThreadExecutor.java | 20 +++-
.../util/thread/strategy/EatWhatYouKill.java | 101 +++++++++++++-----
.../thread/ReservedThreadExecutorTest.java | 28 +++--
3 files changed, 101 insertions(+), 48 deletions(-)
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
index 5010a631854..ace1729763e 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
@@ -22,6 +22,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -32,6 +34,7 @@ import org.eclipse.jetty.util.log.Logger;
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available. Threads are preallocated up to the capacity from a wrapped {@link Executor}.
*/
+@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor
{
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
@@ -79,19 +82,30 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
return _executor;
}
-
+
+ @ManagedAttribute(value = "max number of reserved threads", readonly = true)
public int getCapacity()
{
return _queue.length;
}
-
- public int getPreallocated()
+
+ @ManagedAttribute(value = "available reserved threads", readonly = true)
+ public int getAvailable()
{
try (Locker.Lock lock = _locker.lock())
{
return _size;
}
}
+
+ @ManagedAttribute(value = "pending reserved threads", readonly = true)
+ public int getPending()
+ {
+ try (Locker.Lock lock = _locker.lock())
+ {
+ return _pending;
+ }
+ }
@Override
public void doStart() throws Exception
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java
index 58c38c07bbf..079e9e2684d 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java
@@ -21,7 +21,11 @@ package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.LongAdder;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -57,6 +61,7 @@ import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
* sub-strategy is called ProduceExecuteConsume (PEC).
*
*/
+@ManagedObject("eat what you kill execution strategy")
public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
@@ -64,19 +69,22 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
private enum State { IDLE, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker();
- private State _state = State.IDLE;
+ private final LongAdder _nonBlocking = new LongAdder();
+ private final LongAdder _blocking = new LongAdder();
+ private final LongAdder _executed = new LongAdder();
private final Producer _producer;
private final Executor _executor;
private final ReservedThreadExecutor _producers;
+ private State _state = State.IDLE;
public EatWhatYouKill(Producer producer, Executor executor)
{
this(producer,executor,new ReservedThreadExecutor(executor,1));
}
- public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending)
+ public EatWhatYouKill(Producer producer, Executor executor, int maxReserved)
{
- this(producer,executor,new ReservedThreadExecutor(executor,maxProducersPending));
+ this(producer,executor,new ReservedThreadExecutor(executor,maxReserved));
}
public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
@@ -184,7 +192,9 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
// Could another one just have been queued with a produce call?
if (_state==State.REPRODUCING)
+ {
_state = State.PRODUCING;
+ }
else
{
if (LOG.isDebugEnabled())
@@ -194,36 +204,42 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
}
}
}
- else if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
- {
- // PRODUCE CONSUME (EWYK!)
- if (LOG.isDebugEnabled())
- LOG.debug("{} PC t={}",this,task);
- task.run();
- }
else
{
boolean consume;
- try (Lock locked = _locker.lock())
+ if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING)
{
- if (_producers.tryExecute(this))
+ // PRODUCE CONSUME (EWYK!)
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} PC t={}", this, task);
+ consume = true;
+ _nonBlocking.increment();
+ }
+ else
+ {
+ try (Lock locked = _locker.lock())
{
- // EXECUTE PRODUCE CONSUME!
- // We have executed a new Producer, so we can EWYK consume
- _state = State.IDLE;
- producing = false;
- consume = true;
- }
- else
- {
- // PRODUCE EXECUTE CONSUME!
- consume = false;
+ if (_producers.tryExecute(this))
+ {
+ // EXECUTE PRODUCE CONSUME!
+ // We have executed a new Producer, so we can EWYK consume
+ _state = State.IDLE;
+ producing = false;
+ consume = true;
+ _blocking.increment();
+ }
+ else
+ {
+ // PRODUCE EXECUTE CONSUME!
+ consume = false;
+ _executed.increment();
+ }
}
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task);
}
- if (LOG.isDebugEnabled())
- LOG.debug("{} {} t={}",this,consume?"EPC":"PEC",task);
-
// Consume or execute task
try
{
@@ -232,7 +248,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
else
_executor.execute(task);
}
- catch(RejectedExecutionException e)
+ catch (RejectedExecutionException e)
{
LOG.warn(e);
if (task instanceof Closeable)
@@ -241,13 +257,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
((Closeable)task).close();
}
- catch(Throwable e2)
+ catch (Throwable e2)
{
LOG.ignore(e2);
}
}
}
- catch(Throwable e)
+ catch (Throwable e)
{
LOG.warn(e);
}
@@ -257,7 +273,26 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
return producing;
}
- public Boolean isIdle()
+ @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
+ public long getNonBlockingTasksConsumed()
+ {
+ return _nonBlocking.longValue();
+ }
+
+ @ManagedAttribute(value = "number of blocking tasks consumed", readonly = true)
+ public long getBlockingTasksConsumed()
+ {
+ return _blocking.longValue();
+ }
+
+ @ManagedAttribute(value = "number of blocking tasks executed", readonly = true)
+ public long getBlockingTasksExecuted()
+ {
+ return _executed.longValue();
+ }
+
+ @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
+ public boolean isIdle()
{
try (Lock locked = _locker.lock())
{
@@ -265,6 +300,14 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
}
}
+ @ManagedOperation(value = "resets the task counts", impact = "ACTION")
+ public void reset()
+ {
+ _nonBlocking.reset();
+ _blocking.reset();
+ _executed.reset();
+ }
+
public String toString()
{
try (Lock locked = _locker.lock())
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
index f6209683a59..83702750ec5 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
@@ -45,7 +45,6 @@ public class ReservedThreadExecutorTest
_pae.start();
}
-
@After
public void after() throws Exception
{
@@ -62,13 +61,13 @@ public class ReservedThreadExecutorTest
assertThat(_pae.getCapacity(),is(SIZE));
long started = System.nanoTime();
- while (_pae.getPreallocated()10)
break;
Thread.sleep(100);
}
- assertThat(_pae.getPreallocated(),is(SIZE));
+ assertThat(_pae.getAvailable(),is(SIZE));
}
@Test
@@ -84,28 +83,28 @@ public class ReservedThreadExecutorTest
_executor.execute();
long started = System.nanoTime();
- while (_pae.getPreallocated()10)
break;
Thread.sleep(100);
}
assertThat(_executor._queue.size(),is(0));
- assertThat(_pae.getPreallocated(),is(SIZE));
+ assertThat(_pae.getAvailable(),is(SIZE));
for (int i=SIZE;i-->0;)
assertThat(_pae.tryExecute(new Task()),is(true));
assertThat(_executor._queue.size(),is(1));
- assertThat(_pae.getPreallocated(),is(0));
+ assertThat(_pae.getAvailable(),is(0));
for (int i=SIZE;i-->0;)
assertThat(_pae.tryExecute(new NOOP()),is(false));
assertThat(_executor._queue.size(),is(SIZE));
- assertThat(_pae.getPreallocated(),is(0));
+ assertThat(_pae.getAvailable(),is(0));
assertThat(_pae.tryExecute(new NOOP()),is(false));
assertThat(_executor._queue.size(),is(SIZE));
- assertThat(_pae.getPreallocated(),is(0));
+ assertThat(_pae.getAvailable(),is(0));
}
@Test
@@ -114,13 +113,13 @@ public class ReservedThreadExecutorTest
while(!_executor._queue.isEmpty())
_executor.execute();
long started = System.nanoTime();
- while (_pae.getPreallocated()10)
break;
Thread.sleep(100);
}
- assertThat(_pae.getPreallocated(),is(SIZE));
+ assertThat(_pae.getAvailable(),is(SIZE));
Task[] task = new Task[SIZE];
for (int i=SIZE;i-->0;)
@@ -147,18 +146,15 @@ public class ReservedThreadExecutorTest
}
started = System.nanoTime();
- while (_pae.getPreallocated()10)
break;
Thread.sleep(100);
}
- assertThat(_pae.getPreallocated(),is(SIZE));
-
-
+ assertThat(_pae.getAvailable(),is(SIZE));
}
-
-
+
private static class TestExecutor implements Executor
{
Deque _queue = new ArrayDeque<>();