API: add pending tasks count to cluster health

The number of current pending tasks is useful to detect and overloaded master. This commit adds it to the cluster health API. The complete list can be retrieved from the dedicated pending tasks API.

It also adds rest tests for the cluster health variants.

Closes #9877
This commit is contained in:
Boaz Leskes 2015-02-25 13:25:52 +01:00
parent 1ed6451229
commit 6953777c3a
13 changed files with 195 additions and 33 deletions

View File

@ -10,8 +10,8 @@ timestamping.
% curl 192.168.56.10:9200/_cat/health
1384308967 18:16:07 foo green 3 3 3 3 0 0 0
% curl '192.168.56.10:9200/_cat/health?v&ts=0'
cluster status nodeTotal nodeData shards pri relo init unassign
foo green 3 3 3 3 0 0 0
cluster status nodeTotal nodeData shards pri relo init unassign tasks
foo green 3 3 3 3 0 0 0 0
--------------------------------------------------
A common use of this command is to verify the health is consistent
@ -21,11 +21,11 @@ across nodes:
--------------------------------------------------
% pssh -i -h list.of.cluster.hosts curl -s localhost:9200/_cat/health
[1] 20:20:52 [SUCCESS] es3.vm
1384309218 18:20:18 foo green 3 3 3 3 0 0 0
1384309218 18:20:18 foo green 3 3 3 3 0 0 0 0
[2] 20:20:52 [SUCCESS] es1.vm
1384309218 18:20:18 foo green 3 3 3 3 0 0 0
1384309218 18:20:18 foo green 3 3 3 3 0 0 0 0
[3] 20:20:52 [SUCCESS] es2.vm
1384309218 18:20:18 foo green 3 3 3 3 0 0 0
1384309218 18:20:18 foo green 3 3 3 3 0 0 0 0
--------------------------------------------------
A less obvious use is to track recovery of a large cluster over
@ -36,9 +36,9 @@ to track its progress is by using this command in a delayed loop:
[source,shell]
--------------------------------------------------
% while true; do curl 192.168.56.10:9200/_cat/health; sleep 120; done
1384309446 18:24:06 foo red 3 3 20 20 0 0 1812
1384309566 18:26:06 foo yellow 3 3 950 916 0 12 870
1384309686 18:28:06 foo yellow 3 3 1328 916 0 12 492
1384309446 18:24:06 foo red 3 3 20 20 0 0 1812 0
1384309566 18:26:06 foo yellow 3 3 950 916 0 12 870 0
1384309686 18:28:06 foo yellow 3 3 1328 916 0 12 492 0
1384309806 18:30:06 foo green 3 3 1832 916 4 0 0
^C
--------------------------------------------------

View File

@ -17,10 +17,13 @@ $ curl -XGET 'http://localhost:9200/_cluster/health?pretty=true'
"active_shards" : 10,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0
"unassigned_shards" : 0,
"number_of_pending_tasks" : 0
}
--------------------------------------------------
coming[1.5.0, number of pending tasks was added in 1.5.0]
The API can also be executed against one or more indices to get just the
specified indices health:

View File

@ -0,0 +1,48 @@
---
"Help":
- do:
cat.health:
help: true
- match:
$body: |
/^ epoch .+ \n
timestamp .+ \n
cluster .+ \n
status .+ \n
node.total .+ \n
node.data .+ \n
shards .+ \n
pri .+ \n
relo .+ \n
init .+ \n
unassign .+ \n
pending_tasks .+ \n
$/
---
"Empty cluster":
- do:
cat.health: {}
- match:
$body: |
/^
( \d+ \s+ # epoch
\d\d:\d\d:\d\d \s+ # timestamp
\S+ \s+ # cluster
\w+ \s+ # status
\d+ \s+ # node.total
\d+ \s+ # node.data
\d+ \s+ # shards
\d+ \s+ # pri
\d+ \s+ # relo
\d+ \s+ # init
\d+ \s+ # unassign
\d+ \s+ # pending_tasks
\n
)+
$/

View File

@ -0,0 +1,57 @@
---
"cluster health basic test":
- do:
cluster.health: {}
- is_true: cluster_name
- is_false: timed_out
- gte: { number_of_nodes: 1 }
- gte: { number_of_data_nodes: 1 }
- match: { active_primary_shards: 0 }
- match: { active_shards: 0 }
- match: { relocating_shards: 0 }
- match: { initializing_shards: 0 }
- match: { unassigned_shards: 0 }
- gte: { number_of_pending_tasks: 0 }
---
"cluster health basic test, one index":
- do:
indices.create:
index: test_index
- do:
cluster.health:
wait_for_status: green
- is_true: cluster_name
- is_false: timed_out
- gte: { number_of_nodes: 1 }
- gte: { number_of_data_nodes: 1 }
- gt: { active_primary_shards: 0 }
- gt: { active_shards: 0 }
- gte: { relocating_shards: 0 }
- match: { initializing_shards: 0 }
- match: { unassigned_shards: 0 }
- gte: { number_of_pending_tasks: 0 }
---
"cluster health levels":
- do:
indices.create:
index: test_index
- do:
cluster.health:
wait_for_status: green
level: indices
- is_true: indices
- is_false: indices.test_index.shards
- do:
cluster.health:
level: shards
- is_true: indices
- is_true: indices.test_index.shards

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.health;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -56,6 +57,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
int activePrimaryShards = 0;
int initializingShards = 0;
int unassignedShards = 0;
int numberOfPendingTasks = 0;
boolean timedOut = false;
ClusterHealthStatus status = ClusterHealthStatus.RED;
private List<String> validationFailures;
@ -69,8 +71,12 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
this.validationFailures = validationFailures;
}
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks) {
if (numberOfPendingTasks < 0) {
throw new ElasticsearchIllegalArgumentException("pending task should be non-negative. got [" + numberOfPendingTasks + "]");
}
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
validationFailures = validation.failures();
numberOfNodes = clusterState.nodes().size();
@ -160,6 +166,10 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
return this.numberOfDataNodes;
}
public int getNumberOfPendingTasks() {
return this.numberOfPendingTasks;
}
/**
* <tt>true</tt> if the waitForXXX has timeout out and did not match.
*/
@ -180,6 +190,13 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
return indices.values().iterator();
}
public static ClusterHealthResponse readResponseFrom(StreamInput in) throws IOException {
ClusterHealthResponse response = new ClusterHealthResponse();
response.readFrom(in);
return response;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -191,6 +208,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
unassignedShards = in.readVInt();
numberOfNodes = in.readVInt();
numberOfDataNodes = in.readVInt();
numberOfPendingTasks = in.readInt();
status = ClusterHealthStatus.fromValue(in.readByte());
int size = in.readVInt();
for (int i = 0; i < size; i++) {
@ -219,6 +237,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
out.writeVInt(unassignedShards);
out.writeVInt(numberOfNodes);
out.writeVInt(numberOfDataNodes);
out.writeInt(numberOfPendingTasks);
out.writeByte(status.value());
out.writeVInt(indices.size());
for (ClusterIndexHealth indexHealth : this) {
@ -252,6 +271,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
static final XContentBuilderString TIMED_OUT = new XContentBuilderString("timed_out");
static final XContentBuilderString NUMBER_OF_NODES = new XContentBuilderString("number_of_nodes");
static final XContentBuilderString NUMBER_OF_DATA_NODES = new XContentBuilderString("number_of_data_nodes");
static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks");
static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards");
static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards");
static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards");
@ -273,6 +293,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
builder.field(Fields.RELOCATING_SHARDS, getRelocatingShards());
builder.field(Fields.INITIALIZING_SHARDS, getInitializingShards());
builder.field(Fields.UNASSIGNED_SHARDS, getUnassignedShards());
builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks());
String level = params.param("level", "cluster");
boolean outputIndices = "indices".equals(level) || "shards".equals(level);

View File

@ -204,14 +204,7 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
ClusterShardHealth shardHealth = readClusterShardHealth(in);
shards.put(shardHealth.getId(), shardHealth);
}
size = in.readVInt();
if (size == 0) {
validationFailures = ImmutableList.of();
} else {
for (int i = 0; i < size; i++) {
validationFailures.add(in.readString());
}
}
validationFailures = ImmutableList.copyOf(in.readStringArray());
}
@Override

View File

@ -35,10 +35,6 @@ import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
@ -168,12 +164,12 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
}
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
ClusterHealthResponse response = clusterHealth(request, clusterState);
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks());
return prepareResponse(request, response, clusterState, waitFor);
}
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
ClusterHealthResponse response = clusterHealth(request, clusterState);
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks());
boolean valid = prepareResponse(request, response, clusterState, waitFor);
assert valid || timedOut;
// we check for a timeout here since this method might be called from the wait_for_events
@ -257,7 +253,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
}
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState) {
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks) {
if (logger.isTraceEnabled()) {
logger.trace("Calculating health based on state version [{}]", clusterState.version());
}
@ -266,11 +262,11 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
} catch (IndexMissingException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState);
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks);
response.status = ClusterHealthStatus.RED;
return response;
}
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState);
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks);
}
}

View File

@ -113,4 +113,9 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/
List<PendingClusterTask> pendingTasks();
/**
* Returns the number of currently pending tasks.
*/
int numberOfPendingTasks();
}

View File

@ -319,6 +319,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return pendingClusterTasks;
}
@Override
public int numberOfPendingTasks() {
return updateTasksExecutor.getNumberOfPendingTasks();
}
static abstract class TimedPrioritizedRunnable extends PrioritizedRunnable {
private final long creationTime;
protected final String source;

View File

@ -50,6 +50,12 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
return pending.toArray(new Pending[pending.size()]);
}
public int getNumberOfPendingTasks() {
int size = current.size();
size += getQueue().size();
return size;
}
private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
for (Runnable runnable : runnables) {
if (runnable instanceof TieBreakingPrioritizedRunnable) {

View File

@ -25,7 +25,10 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestResponseListener;
import org.elasticsearch.rest.action.support.RestTable;
import org.joda.time.format.DateTimeFormat;
@ -76,6 +79,7 @@ public class RestHealthAction extends AbstractCatAction {
t.addCell("relo", "alias:r,shards.relocating,shardsRelocating;text-align:right;desc:number of relocating nodes");
t.addCell("init", "alias:i,shards.initializing,shardsInitializing;text-align:right;desc:number of initializing nodes");
t.addCell("unassign", "alias:u,shards.unassigned,shardsUnassigned;text-align:right;desc:number of unassigned shards");
t.addCell("pending_tasks", "alias:pt,pendingTasks;text-align:right;desc:number of pending tasks");
t.endHeaders();
return t;
@ -98,6 +102,7 @@ public class RestHealthAction extends AbstractCatAction {
t.addCell(health.getRelocatingShards());
t.addCell(health.getInitializingShards());
t.addCell(health.getUnassignedShards());
t.addCell(health.getNumberOfPendingTasks());
t.endRow();
return t;
}

View File

@ -29,11 +29,15 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.empty;
@ -177,7 +181,7 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
}
@Test
public void testClusterHealth() {
public void testClusterHealth() throws IOException {
ShardCounter counter = new ShardCounter();
RoutingTable.Builder routingTable = RoutingTable.builder();
MetaData.Builder metaData = MetaData.builder();
@ -190,14 +194,26 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
routingTable.add(indexRoutingTable);
}
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[])null), clusterState);
int pendingTasks = randomIntBetween(0, 200);
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, pendingTasks);
logger.info("cluster status: {}, expected {}", clusterHealth.getStatus(), counter.status());
clusterHealth = maybeSerialize(clusterHealth);
assertClusterHealth(clusterHealth, counter);
assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks));
}
ClusterHealthResponse maybeSerialize(ClusterHealthResponse clusterHealth) throws IOException {
if (randomBoolean()) {
BytesStreamOutput out = new BytesStreamOutput();
clusterHealth.writeTo(out);
BytesStreamInput in = new BytesStreamInput(out.bytes());
clusterHealth = ClusterHealthResponse.readResponseFrom(in);
}
return clusterHealth;
}
@Test
public void testValidations() {
public void testValidations() throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(2).build();
ShardCounter counter = new ShardCounter();
IndexRoutingTable indexRoutingTable = genIndexRoutingTable(indexMetaData, counter);
@ -211,7 +227,8 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
metaData.put(indexMetaData, true);
routingTable.add(indexRoutingTable);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[])null), clusterState);
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, 0);
clusterHealth = maybeSerialize(clusterHealth);
// currently we have no cluster level validation failures as index validation issues are reported per index.
assertThat(clusterHealth.getValidationFailures(), Matchers.hasSize(0));
}

View File

@ -131,6 +131,11 @@ public class NoopClusterService implements ClusterService {
return null;
}
@Override
public int numberOfPendingTasks() {
return 0;
}
@Override
public Lifecycle.State lifecycleState() {
return null;