From 88f8d58c8bec33494139a10c921afd55d3895d7c Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 22 Jun 2015 14:59:35 +0200 Subject: [PATCH] Cluster Health: Add max wait time for pending task and active shard percentage In order to get a quick overview using by simply checking the cluster state and its corresponding cat API, the following two attributes have been added to the cluster health response: * task max waiting time, the time value of the first task of the queue and how long it has been waiting * active shards percent: The percentage of the number of shards that are in initializing state This makes the cluster health API handy to check, when a fully restarted cluster is back up and running. Closes #10805 --- .../cluster/health/ClusterHealthResponse.java | 52 ++++++++++++++++++- .../health/TransportClusterHealthAction.java | 20 ++++--- .../elasticsearch/cluster/ClusterService.java | 6 +++ .../service/InternalClusterService.java | 41 ++++++--------- .../util/concurrent/EsThreadPoolExecutor.java | 9 ++-- .../PrioritizedEsThreadPoolExecutor.java | 22 +++++++- .../util/concurrent/PrioritizedRunnable.java | 11 ++++ .../common/xcontent/XContentBuilder.java | 9 ++++ .../rest/action/cat/RestHealthAction.java | 4 ++ .../cluster/ClusterHealthResponsesTests.java | 11 ++-- .../test/cluster/NoopClusterService.java | 6 ++- .../test/cluster/TestClusterService.java | 5 ++ .../test/cat.health/10_basic.yaml | 4 ++ 13 files changed, 156 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java index a87df801a63..258d6072201 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -27,8 +27,10 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTableValidation; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; @@ -60,6 +62,8 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable validationFailures; @@ -70,15 +74,19 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable shardRoutings = clusterState.getRoutingTable().allShards(); + int activeShardCount = 0; + int totalShardCount = 0; + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting.active()) activeShardCount++; + totalShardCount++; + } + this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100; + } } public String getClusterName() { @@ -200,6 +222,21 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable iterator() { return indices.values().iterator(); @@ -244,6 +281,9 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable { */ int numberOfPendingTasks(); + /** + * Returns the maximum wait time for tasks in the queue + * + * @returns A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue + */ + TimeValue getMaxTaskWaitTime(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 621bfb9868f..964a94624fe 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -243,7 +243,7 @@ public class InternalClusterService extends AbstractLifecycleComponent current = ConcurrentCollections.newQueue(); @@ -56,6 +57,26 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { return size; } + /** + * Returns the waiting time of the first task in the queue + */ + public TimeValue getMaxTaskWaitTime() { + if (getQueue().size() == 0) { + return NO_WAIT_TIME_VALUE; + } + + long now = System.nanoTime(); + long oldestCreationDateInNanos = now; + for (Runnable queuedRunnable : getQueue()) { + if (queuedRunnable instanceof PrioritizedRunnable) { + oldestCreationDateInNanos = Math.min(oldestCreationDateInNanos, + ((PrioritizedRunnable) queuedRunnable).getCreationDateInNanos()); + } + } + + return TimeValue.timeValueNanos(now - oldestCreationDateInNanos); + } + private void addPending(List runnables, List pending, boolean executing) { for (Runnable runnable : runnables) { if (runnable instanceof TieBreakingPrioritizedRunnable) { @@ -191,7 +212,6 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { timeoutFuture = null; } } - } private final class PrioritizedFutureTask extends FutureTask implements Comparable { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java index bd830ba75b9..09d644e664f 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; /** * @@ -26,6 +27,7 @@ import org.elasticsearch.common.Priority; public abstract class PrioritizedRunnable implements Runnable, Comparable { private final Priority priority; + private final long creationDate; public static PrioritizedRunnable wrap(Runnable runnable, Priority priority) { return new Wrapped(runnable, priority); @@ -33,6 +35,15 @@ public abstract class PrioritizedRunnable implements Runnable, Comparable