mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-28 07:59:10 +00:00
Cluster Health: Add max wait time for pending task and active shard percentage
In order to get a quick overview using by simply checking the cluster state and its corresponding cat API, the following two attributes have been added to the cluster health response: * task max waiting time, the time value of the first task of the queue and how long it has been waiting * active shards percent: The percentage of the number of shards that are in initializing state This makes the cluster health API handy to check, when a fully restarted cluster is back up and running. Closes #10805
This commit is contained in:
parent
8b60083dda
commit
88f8d58c8b
@ -27,8 +27,10 @@ import org.elasticsearch.cluster.ClusterState;
|
|||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.RoutingTableValidation;
|
import org.elasticsearch.cluster.routing.RoutingTableValidation;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||||
@ -60,6 +62,8 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
int numberOfPendingTasks = 0;
|
int numberOfPendingTasks = 0;
|
||||||
int numberOfInFlightFetch = 0;
|
int numberOfInFlightFetch = 0;
|
||||||
int delayedUnassignedShards = 0;
|
int delayedUnassignedShards = 0;
|
||||||
|
TimeValue taskMaxWaitingTime = TimeValue.timeValueMillis(0);
|
||||||
|
double activeShardsPercent = 100;
|
||||||
boolean timedOut = false;
|
boolean timedOut = false;
|
||||||
ClusterHealthStatus status = ClusterHealthStatus.RED;
|
ClusterHealthStatus status = ClusterHealthStatus.RED;
|
||||||
private List<String> validationFailures;
|
private List<String> validationFailures;
|
||||||
@ -70,15 +74,19 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
|
|
||||||
/** needed for plugins BWC */
|
/** needed for plugins BWC */
|
||||||
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
|
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
|
||||||
this(clusterName, concreteIndices, clusterState, -1, -1, -1);
|
this(clusterName, concreteIndices, clusterState, -1, -1, -1, TimeValue.timeValueHours(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
|
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
|
||||||
int numberOfInFlightFetch, int delayedUnassignedShards) {
|
int numberOfInFlightFetch, int delayedUnassignedShards, TimeValue taskMaxWaitingTime) {
|
||||||
this.clusterName = clusterName;
|
this.clusterName = clusterName;
|
||||||
this.numberOfPendingTasks = numberOfPendingTasks;
|
this.numberOfPendingTasks = numberOfPendingTasks;
|
||||||
this.numberOfInFlightFetch = numberOfInFlightFetch;
|
this.numberOfInFlightFetch = numberOfInFlightFetch;
|
||||||
this.delayedUnassignedShards = delayedUnassignedShards;
|
this.delayedUnassignedShards = delayedUnassignedShards;
|
||||||
|
this.clusterName = clusterName;
|
||||||
|
this.numberOfPendingTasks = numberOfPendingTasks;
|
||||||
|
this.numberOfInFlightFetch = numberOfInFlightFetch;
|
||||||
|
this.taskMaxWaitingTime = taskMaxWaitingTime;
|
||||||
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
|
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
|
||||||
validationFailures = validation.failures();
|
validationFailures = validation.failures();
|
||||||
numberOfNodes = clusterState.nodes().size();
|
numberOfNodes = clusterState.nodes().size();
|
||||||
@ -116,6 +124,20 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
} else if (clusterState.blocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE)) {
|
} else if (clusterState.blocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE)) {
|
||||||
status = ClusterHealthStatus.RED;
|
status = ClusterHealthStatus.RED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shortcut on green
|
||||||
|
if (status.equals(ClusterHealthStatus.GREEN)) {
|
||||||
|
this.activeShardsPercent = 100;
|
||||||
|
} else {
|
||||||
|
List<ShardRouting> shardRoutings = clusterState.getRoutingTable().allShards();
|
||||||
|
int activeShardCount = 0;
|
||||||
|
int totalShardCount = 0;
|
||||||
|
for (ShardRouting shardRouting : shardRoutings) {
|
||||||
|
if (shardRouting.active()) activeShardCount++;
|
||||||
|
totalShardCount++;
|
||||||
|
}
|
||||||
|
this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getClusterName() {
|
public String getClusterName() {
|
||||||
@ -200,6 +222,21 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
return indices;
|
return indices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return The maximum wait time of all tasks in the queue
|
||||||
|
*/
|
||||||
|
public TimeValue getTaskMaxWaitingTime() {
|
||||||
|
return taskMaxWaitingTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The percentage of active shards, should be 100% in a green system
|
||||||
|
*/
|
||||||
|
public double getActiveShardsPercent() {
|
||||||
|
return activeShardsPercent;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<ClusterIndexHealth> iterator() {
|
public Iterator<ClusterIndexHealth> iterator() {
|
||||||
return indices.values().iterator();
|
return indices.values().iterator();
|
||||||
@ -244,6 +281,9 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
if (in.getVersion().onOrAfter(Version.V_1_7_0)) {
|
if (in.getVersion().onOrAfter(Version.V_1_7_0)) {
|
||||||
delayedUnassignedShards= in.readInt();
|
delayedUnassignedShards= in.readInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
activeShardsPercent = in.readDouble();
|
||||||
|
taskMaxWaitingTime = TimeValue.readTimeValue(in);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -274,6 +314,8 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
if (out.getVersion().onOrAfter(Version.V_1_7_0)) {
|
if (out.getVersion().onOrAfter(Version.V_1_7_0)) {
|
||||||
out.writeInt(delayedUnassignedShards);
|
out.writeInt(delayedUnassignedShards);
|
||||||
}
|
}
|
||||||
|
out.writeDouble(activeShardsPercent);
|
||||||
|
taskMaxWaitingTime.writeTo(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -299,6 +341,10 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks");
|
static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks");
|
||||||
static final XContentBuilderString NUMBER_OF_IN_FLIGHT_FETCH = new XContentBuilderString("number_of_in_flight_fetch");
|
static final XContentBuilderString NUMBER_OF_IN_FLIGHT_FETCH = new XContentBuilderString("number_of_in_flight_fetch");
|
||||||
static final XContentBuilderString DELAYED_UNASSIGNED_SHARDS = new XContentBuilderString("delayed_unassigned_shards");
|
static final XContentBuilderString DELAYED_UNASSIGNED_SHARDS = new XContentBuilderString("delayed_unassigned_shards");
|
||||||
|
static final XContentBuilderString TASK_MAX_WAIT_TIME_IN_QUEUE = new XContentBuilderString("task_max_waiting_in_queue");
|
||||||
|
static final XContentBuilderString TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = new XContentBuilderString("task_max_waiting_in_queue_millis");
|
||||||
|
static final XContentBuilderString ACTIVE_SHARDS_PERCENT_AS_NUMBER = new XContentBuilderString("active_shards_percent_as_number");
|
||||||
|
static final XContentBuilderString ACTIVE_SHARDS_PERCENT = new XContentBuilderString("active_shards_percent");
|
||||||
static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards");
|
static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards");
|
||||||
static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards");
|
static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards");
|
||||||
static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards");
|
static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards");
|
||||||
@ -323,6 +369,8 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
|
|||||||
builder.field(Fields.DELAYED_UNASSIGNED_SHARDS, getDelayedUnassignedShards());
|
builder.field(Fields.DELAYED_UNASSIGNED_SHARDS, getDelayedUnassignedShards());
|
||||||
builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks());
|
builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks());
|
||||||
builder.field(Fields.NUMBER_OF_IN_FLIGHT_FETCH, getNumberOfInFlightFetch());
|
builder.field(Fields.NUMBER_OF_IN_FLIGHT_FETCH, getNumberOfInFlightFetch());
|
||||||
|
builder.timeValueField(Fields.TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS, Fields.TASK_MAX_WAIT_TIME_IN_QUEUE, getTaskMaxWaitingTime());
|
||||||
|
builder.percentageField(Fields.ACTIVE_SHARDS_PERCENT_AS_NUMBER, Fields.ACTIVE_SHARDS_PERCENT, getActiveShardsPercent());
|
||||||
|
|
||||||
String level = params.param("level", "cluster");
|
String level = params.param("level", "cluster");
|
||||||
boolean outputIndices = "indices".equals(level) || "shards".equals(level);
|
boolean outputIndices = "indices".equals(level) || "shards".equals(level);
|
||||||
|
@ -25,9 +25,6 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||||
import org.elasticsearch.cluster.*;
|
import org.elasticsearch.cluster.*;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
||||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
@ -170,12 +167,14 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
|
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
|
||||||
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
|
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(),
|
||||||
|
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMaxTaskWaitTime());
|
||||||
return prepareResponse(request, response, clusterState, waitFor);
|
return prepareResponse(request, response, clusterState, waitFor);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
|
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
|
||||||
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
|
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(),
|
||||||
|
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMaxTaskWaitTime());
|
||||||
boolean valid = prepareResponse(request, response, clusterState, waitFor);
|
boolean valid = prepareResponse(request, response, clusterState, waitFor);
|
||||||
assert valid || timedOut;
|
assert valid || timedOut;
|
||||||
// we check for a timeout here since this method might be called from the wait_for_events
|
// we check for a timeout here since this method might be called from the wait_for_events
|
||||||
@ -259,20 +258,25 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks, int numberOfInFlightFetch) {
|
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks, int numberOfInFlightFetch,
|
||||||
|
TimeValue pendingTaskTimeInQueue) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Calculating health based on state version [{}]", clusterState.version());
|
logger.trace("Calculating health based on state version [{}]", clusterState.version());
|
||||||
}
|
}
|
||||||
|
|
||||||
String[] concreteIndices;
|
String[] concreteIndices;
|
||||||
try {
|
try {
|
||||||
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
|
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
|
||||||
} catch (IndexMissingException e) {
|
} catch (IndexMissingException e) {
|
||||||
// one of the specified indices is not there - treat it as RED.
|
// one of the specified indices is not there - treat it as RED.
|
||||||
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
|
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState,
|
||||||
|
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState),
|
||||||
|
pendingTaskTimeInQueue);
|
||||||
response.status = ClusterHealthStatus.RED;
|
response.status = ClusterHealthStatus.RED;
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState));
|
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks,
|
||||||
|
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState), pendingTaskTimeInQueue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,4 +120,10 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
|
|||||||
*/
|
*/
|
||||||
int numberOfPendingTasks();
|
int numberOfPendingTasks();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the maximum wait time for tasks in the queue
|
||||||
|
*
|
||||||
|
* @returns A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue
|
||||||
|
*/
|
||||||
|
TimeValue getMaxTaskWaitTime();
|
||||||
}
|
}
|
||||||
|
@ -243,7 +243,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||||||
}
|
}
|
||||||
// call the post added notification on the same event thread
|
// call the post added notification on the same event thread
|
||||||
try {
|
try {
|
||||||
updateTasksExecutor.execute(new TimedPrioritizedRunnable(Priority.HIGH, "_add_listener_") {
|
updateTasksExecutor.execute(new PrioritizedRunnable(Priority.HIGH) {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (timeout != null) {
|
if (timeout != null) {
|
||||||
@ -312,10 +312,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||||||
final Object task = pending.task;
|
final Object task = pending.task;
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
continue;
|
continue;
|
||||||
} else if (task instanceof TimedPrioritizedRunnable) {
|
} else if (task instanceof UpdateTask) {
|
||||||
TimedPrioritizedRunnable runnable = (TimedPrioritizedRunnable) task;
|
UpdateTask runnable = (UpdateTask) task;
|
||||||
source = runnable.source();
|
source = runnable.source();
|
||||||
timeInQueue = runnable.timeSinceCreatedInMillis();
|
timeInQueue = runnable.getAgeInMillis();
|
||||||
} else {
|
} else {
|
||||||
assert false : "expected TimedPrioritizedRunnable got " + task.getClass();
|
assert false : "expected TimedPrioritizedRunnable got " + task.getClass();
|
||||||
source = "unknown [" + task.getClass() + "]";
|
source = "unknown [" + task.getClass() + "]";
|
||||||
@ -332,37 +332,26 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||||||
return updateTasksExecutor.getNumberOfPendingTasks();
|
return updateTasksExecutor.getNumberOfPendingTasks();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue getMaxTaskWaitTime() {
|
||||||
|
return updateTasksExecutor.getMaxTaskWaitTime();
|
||||||
|
}
|
||||||
|
|
||||||
static abstract class TimedPrioritizedRunnable extends PrioritizedRunnable {
|
class UpdateTask extends PrioritizedRunnable {
|
||||||
private final long creationTimeNS;
|
|
||||||
|
public final ClusterStateUpdateTask updateTask;
|
||||||
protected final String source;
|
protected final String source;
|
||||||
|
|
||||||
protected TimedPrioritizedRunnable(Priority priority, String source) {
|
|
||||||
super(priority);
|
|
||||||
this.source = source;
|
|
||||||
this.creationTimeNS = System.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long timeSinceCreatedInMillis() {
|
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
|
||||||
// max with 0 to make sure we always return a non negative number
|
super(priority);
|
||||||
// even if time shifts.
|
this.updateTask = updateTask;
|
||||||
return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - creationTimeNS));
|
this.source = source;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String source() {
|
public String source() {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
class UpdateTask extends TimedPrioritizedRunnable {
|
|
||||||
|
|
||||||
public final ClusterStateUpdateTask updateTask;
|
|
||||||
|
|
||||||
|
|
||||||
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
|
|
||||||
super(priority, source);
|
|
||||||
this.updateTask = updateTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -20,7 +20,10 @@
|
|||||||
package org.elasticsearch.common.util.concurrent;
|
package org.elasticsearch.common.util.concurrent;
|
||||||
|
|
||||||
|
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
|
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
|
||||||
@ -67,8 +70,8 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static interface ShutdownListener {
|
public interface ShutdownListener {
|
||||||
public void onTerminated();
|
void onTerminated();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
*/
|
*/
|
||||||
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
|
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
|
||||||
|
|
||||||
|
private static final TimeValue NO_WAIT_TIME_VALUE = TimeValue.timeValueMillis(0);
|
||||||
private AtomicLong insertionOrder = new AtomicLong();
|
private AtomicLong insertionOrder = new AtomicLong();
|
||||||
private Queue<Runnable> current = ConcurrentCollections.newQueue();
|
private Queue<Runnable> current = ConcurrentCollections.newQueue();
|
||||||
|
|
||||||
@ -56,6 +57,26 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
|
|||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the waiting time of the first task in the queue
|
||||||
|
*/
|
||||||
|
public TimeValue getMaxTaskWaitTime() {
|
||||||
|
if (getQueue().size() == 0) {
|
||||||
|
return NO_WAIT_TIME_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
long now = System.nanoTime();
|
||||||
|
long oldestCreationDateInNanos = now;
|
||||||
|
for (Runnable queuedRunnable : getQueue()) {
|
||||||
|
if (queuedRunnable instanceof PrioritizedRunnable) {
|
||||||
|
oldestCreationDateInNanos = Math.min(oldestCreationDateInNanos,
|
||||||
|
((PrioritizedRunnable) queuedRunnable).getCreationDateInNanos());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TimeValue.timeValueNanos(now - oldestCreationDateInNanos);
|
||||||
|
}
|
||||||
|
|
||||||
private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
|
private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
|
||||||
for (Runnable runnable : runnables) {
|
for (Runnable runnable : runnables) {
|
||||||
if (runnable instanceof TieBreakingPrioritizedRunnable) {
|
if (runnable instanceof TieBreakingPrioritizedRunnable) {
|
||||||
@ -191,7 +212,6 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
|
|||||||
timeoutFuture = null;
|
timeoutFuture = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
|
private final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.elasticsearch.common.util.concurrent;
|
package org.elasticsearch.common.util.concurrent;
|
||||||
|
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -26,6 +27,7 @@ import org.elasticsearch.common.Priority;
|
|||||||
public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
|
public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
|
||||||
|
|
||||||
private final Priority priority;
|
private final Priority priority;
|
||||||
|
private final long creationDate;
|
||||||
|
|
||||||
public static PrioritizedRunnable wrap(Runnable runnable, Priority priority) {
|
public static PrioritizedRunnable wrap(Runnable runnable, Priority priority) {
|
||||||
return new Wrapped(runnable, priority);
|
return new Wrapped(runnable, priority);
|
||||||
@ -33,6 +35,15 @@ public abstract class PrioritizedRunnable implements Runnable, Comparable<Priori
|
|||||||
|
|
||||||
protected PrioritizedRunnable(Priority priority) {
|
protected PrioritizedRunnable(Priority priority) {
|
||||||
this.priority = priority;
|
this.priority = priority;
|
||||||
|
creationDate = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCreationDateInNanos() {
|
||||||
|
return creationDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAgeInMillis() {
|
||||||
|
return Math.max(0, (System.nanoTime() - creationDate) / 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -43,6 +43,7 @@ import java.math.BigDecimal;
|
|||||||
import java.math.RoundingMode;
|
import java.math.RoundingMode;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -957,6 +958,14 @@ public final class XContentBuilder implements BytesStream, Releasable {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public XContentBuilder percentageField(XContentBuilderString rawFieldName, XContentBuilderString readableFieldName, double percentage) throws IOException {
|
||||||
|
if (humanReadable) {
|
||||||
|
field(readableFieldName, String.format(Locale.ROOT, "%1.1f%%", percentage));
|
||||||
|
}
|
||||||
|
field(rawFieldName, percentage);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public XContentBuilder value(Boolean value) throws IOException {
|
public XContentBuilder value(Boolean value) throws IOException {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return nullValue();
|
return nullValue();
|
||||||
|
@ -80,6 +80,8 @@ public class RestHealthAction extends AbstractCatAction {
|
|||||||
t.addCell("init", "alias:i,shards.initializing,shardsInitializing;text-align:right;desc:number of initializing nodes");
|
t.addCell("init", "alias:i,shards.initializing,shardsInitializing;text-align:right;desc:number of initializing nodes");
|
||||||
t.addCell("unassign", "alias:u,shards.unassigned,shardsUnassigned;text-align:right;desc:number of unassigned shards");
|
t.addCell("unassign", "alias:u,shards.unassigned,shardsUnassigned;text-align:right;desc:number of unassigned shards");
|
||||||
t.addCell("pending_tasks", "alias:pt,pendingTasks;text-align:right;desc:number of pending tasks");
|
t.addCell("pending_tasks", "alias:pt,pendingTasks;text-align:right;desc:number of pending tasks");
|
||||||
|
t.addCell("max_task_wait_time", "alias:mtwt,maxTaskWaitTime;text-align:right;desc:wait time of longest task pending");
|
||||||
|
t.addCell("active_shards_percent", "alias:asp,activeShardsPercent;text-align:right;desc:active number of shards in percent");
|
||||||
t.endHeaders();
|
t.endHeaders();
|
||||||
|
|
||||||
return t;
|
return t;
|
||||||
@ -103,6 +105,8 @@ public class RestHealthAction extends AbstractCatAction {
|
|||||||
t.addCell(health.getInitializingShards());
|
t.addCell(health.getInitializingShards());
|
||||||
t.addCell(health.getUnassignedShards());
|
t.addCell(health.getUnassignedShards());
|
||||||
t.addCell(health.getNumberOfPendingTasks());
|
t.addCell(health.getNumberOfPendingTasks());
|
||||||
|
t.addCell(health.getTaskMaxWaitingTime().millis() == 0 ? "-" : health.getTaskMaxWaitingTime());
|
||||||
|
t.addCell(String.format(Locale.ROOT, "%1.1f%%", health.getActiveShardsPercent()));
|
||||||
t.endRow();
|
t.endRow();
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||||||
import org.elasticsearch.cluster.routing.*;
|
import org.elasticsearch.cluster.routing.*;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
@ -38,8 +39,9 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.allOf;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.empty;
|
import static org.hamcrest.Matchers.*;
|
||||||
|
|
||||||
public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
|
public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
|
||||||
|
|
||||||
@ -193,13 +195,16 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
|
|||||||
int pendingTasks = randomIntBetween(0, 200);
|
int pendingTasks = randomIntBetween(0, 200);
|
||||||
int inFlight = randomIntBetween(0, 200);
|
int inFlight = randomIntBetween(0, 200);
|
||||||
int delayedUnassigned = randomIntBetween(0, 200);
|
int delayedUnassigned = randomIntBetween(0, 200);
|
||||||
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, pendingTasks, inFlight, delayedUnassigned);
|
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
|
||||||
|
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, pendingTasks, inFlight, delayedUnassigned, pendingTaskInQueueTime);
|
||||||
logger.info("cluster status: {}, expected {}", clusterHealth.getStatus(), counter.status());
|
logger.info("cluster status: {}, expected {}", clusterHealth.getStatus(), counter.status());
|
||||||
clusterHealth = maybeSerialize(clusterHealth);
|
clusterHealth = maybeSerialize(clusterHealth);
|
||||||
assertClusterHealth(clusterHealth, counter);
|
assertClusterHealth(clusterHealth, counter);
|
||||||
assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks));
|
assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks));
|
||||||
assertThat(clusterHealth.getNumberOfInFlightFetch(), Matchers.equalTo(inFlight));
|
assertThat(clusterHealth.getNumberOfInFlightFetch(), Matchers.equalTo(inFlight));
|
||||||
assertThat(clusterHealth.getDelayedUnassignedShards(), Matchers.equalTo(delayedUnassigned));
|
assertThat(clusterHealth.getDelayedUnassignedShards(), Matchers.equalTo(delayedUnassigned));
|
||||||
|
assertThat(clusterHealth.getTaskMaxWaitingTime().millis(), is(pendingTaskInQueueTime.millis()));
|
||||||
|
assertThat(clusterHealth.getActiveShardsPercent(), is(allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0))));
|
||||||
}
|
}
|
||||||
|
|
||||||
ClusterHealthResponse maybeSerialize(ClusterHealthResponse clusterHealth) throws IOException {
|
ClusterHealthResponse maybeSerialize(ClusterHealthResponse clusterHealth) throws IOException {
|
||||||
@ -227,7 +232,7 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
|
|||||||
metaData.put(indexMetaData, true);
|
metaData.put(indexMetaData, true);
|
||||||
routingTable.add(indexRoutingTable);
|
routingTable.add(indexRoutingTable);
|
||||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
||||||
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, 0, 0, 0);
|
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, 0, 0, 0, TimeValue.timeValueMillis(0));
|
||||||
clusterHealth = maybeSerialize(clusterHealth);
|
clusterHealth = maybeSerialize(clusterHealth);
|
||||||
// currently we have no cluster level validation failures as index validation issues are reported per index.
|
// currently we have no cluster level validation failures as index validation issues are reported per index.
|
||||||
assertThat(clusterHealth.getValidationFailures(), Matchers.hasSize(0));
|
assertThat(clusterHealth.getValidationFailures(), Matchers.hasSize(0));
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.elasticsearch.test.cluster;
|
package org.elasticsearch.test.cluster;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.*;
|
import org.elasticsearch.cluster.*;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||||
@ -135,6 +134,11 @@ public class NoopClusterService implements ClusterService {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue getMaxTaskWaitTime() {
|
||||||
|
return TimeValue.timeValueMillis(0);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Lifecycle.State lifecycleState() {
|
public Lifecycle.State lifecycleState() {
|
||||||
return null;
|
return null;
|
||||||
|
@ -192,6 +192,11 @@ public class TestClusterService implements ClusterService {
|
|||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue getMaxTaskWaitTime() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Lifecycle.State lifecycleState() {
|
public Lifecycle.State lifecycleState() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
init .+ \n
|
init .+ \n
|
||||||
unassign .+ \n
|
unassign .+ \n
|
||||||
pending_tasks .+ \n
|
pending_tasks .+ \n
|
||||||
|
max_task_wait_time .+ \n
|
||||||
|
active_shards_percent .+ \n
|
||||||
|
|
||||||
$/
|
$/
|
||||||
|
|
||||||
@ -44,6 +46,8 @@
|
|||||||
\d+ \s+ # init
|
\d+ \s+ # init
|
||||||
\d+ \s+ # unassign
|
\d+ \s+ # unassign
|
||||||
\d+ \s+ # pending_tasks
|
\d+ \s+ # pending_tasks
|
||||||
|
(-|\d+[.]\d+ms|s) \s+ # max task waiting time
|
||||||
|
\d+\.\d+% \s+ # active shards percent
|
||||||
\n
|
\n
|
||||||
)+
|
)+
|
||||||
$/
|
$/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user