Add wait_for_no_initializing_shards to cluster health API (#27489)

This adds a new option to the cluster health request allowing to wait
until there is no initializing shards.

Closes #25623
This commit is contained in:
Nhat Nguyen 2017-11-23 15:09:58 -05:00 committed by GitHub
parent 93a988c557
commit 46b508d6c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 252 additions and 17 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActiveShardCount;
@ -40,6 +41,7 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
private TimeValue timeout = new TimeValue(30, TimeUnit.SECONDS);
private ClusterHealthStatus waitForStatus;
private boolean waitForNoRelocatingShards = false;
private boolean waitForNoInitializingShards = false;
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;
@ -72,6 +74,9 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
if (in.readBoolean()) {
waitForEvents = Priority.readFrom(in);
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
waitForNoInitializingShards = in.readBoolean();
}
}
@Override
@ -101,6 +106,9 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
out.writeBoolean(true);
Priority.writeTo(waitForEvents, out);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(waitForNoInitializingShards);
}
}
@Override
@ -167,6 +175,21 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
return this;
}
public boolean waitForNoInitializingShards() {
return waitForNoInitializingShards;
}
/**
* Sets whether the request should wait for there to be no initializing shards before
* retrieving the cluster health status. Defaults to {@code false}, meaning the
* operation does not wait on there being no more initializing shards. Set to <code>true</code>
* to wait until the number of initializing shards in the cluster is 0.
*/
public ClusterHealthRequest waitForNoInitializingShards(boolean waitForNoInitializingShards) {
this.waitForNoInitializingShards = waitForNoInitializingShards;
return this;
}
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

View File

@ -73,6 +73,17 @@ public class ClusterHealthRequestBuilder extends MasterNodeReadOperationRequestB
return this;
}
/**
* Sets whether the request should wait for there to be no initializing shards before
* retrieving the cluster health status. Defaults to <code>false</code>, meaning the
* operation does not wait on there being no more initializing shards. Set to <code>true</code>
* to wait until the number of initializing shards in the cluster is 0.
*/
public ClusterHealthRequestBuilder setWaitForNoInitializingShards(boolean waitForNoInitializingShards) {
request.waitForNoInitializingShards(waitForNoInitializingShards);
return this;
}
/**
* Sets the number of shard copies that must be active before getting the health status.
* Defaults to {@link ActiveShardCount#NONE}, meaning we don't wait on any active shards.

View File

@ -142,24 +142,26 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
}
private void executeHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
int waitFor = 5;
if (request.waitForStatus() == null) {
waitFor--;
int waitFor = 0;
if (request.waitForStatus() != null) {
waitFor++;
}
if (request.waitForNoRelocatingShards() == false) {
waitFor--;
if (request.waitForNoRelocatingShards()) {
waitFor++;
}
if (request.waitForActiveShards().equals(ActiveShardCount.NONE)) {
waitFor--;
if (request.waitForNoInitializingShards()) {
waitFor++;
}
if (request.waitForNodes().isEmpty()) {
waitFor--;
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
waitFor++;
}
if (request.indices() == null || request.indices().length == 0) { // check that they actually exists in the meta data
waitFor--;
if (request.waitForNodes().isEmpty() == false) {
waitFor++;
}
if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data
waitFor++;
}
assert waitFor >= 0;
final ClusterState state = clusterService.state();
final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, null, logger, threadPool.getThreadContext());
if (request.timeout().millis() == 0) {
@ -196,13 +198,15 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
return prepareResponse(request, response, clusterState, waitFor);
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
return readyCounter == waitFor;
}
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
boolean valid = prepareResponse(request, response, clusterState, waitFor);
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
boolean valid = (readyCounter == waitFor);
assert valid || timedOut;
// we check for a timeout here since this method might be called from the wait_for_events
// response handler which might have timed out already.
@ -213,7 +217,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
return response;
}
private boolean prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response, ClusterState clusterState, final int waitFor) {
static int prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response,
final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) {
int waitForCounter = 0;
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
waitForCounter++;
@ -221,6 +226,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
if (request.waitForNoRelocatingShards() && response.getRelocatingShards() == 0) {
waitForCounter++;
}
if (request.waitForNoInitializingShards() && response.getInitializingShards() == 0) {
waitForCounter++;
}
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
assert waitForActiveShards.equals(ActiveShardCount.DEFAULT) == false :
@ -292,7 +300,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
}
}
}
return waitForCounter == waitFor;
return waitForCounter;
}

View File

@ -62,7 +62,9 @@ public class RestClusterHealthAction extends BaseRestHandler {
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
}
clusterHealthRequest.waitForNoRelocatingShards(
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
clusterHealthRequest.waitForNoInitializingShards(
request.paramAsBoolean("wait_for_no_initializing_shards", clusterHealthRequest.waitForNoRelocatingShards()));
if (request.hasParam("wait_for_relocating_shards")) {
// wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards
throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " +

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.core.IsEqual.equalTo;
public class ClusterHealthRequestTests extends ESTestCase {
public void testSerialize() throws Exception {
final ClusterHealthRequest originalRequest = randomRequest();
final ClusterHealthRequest cloneRequest;
try (BytesStreamOutput out = new BytesStreamOutput()) {
originalRequest.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
cloneRequest = new ClusterHealthRequest(in);
}
}
assertThat(cloneRequest.waitForStatus(), equalTo(originalRequest.waitForStatus()));
assertThat(cloneRequest.waitForNodes(), equalTo(originalRequest.waitForNodes()));
assertThat(cloneRequest.waitForNoInitializingShards(), equalTo(originalRequest.waitForNoInitializingShards()));
assertThat(cloneRequest.waitForNoRelocatingShards(), equalTo(originalRequest.waitForNoRelocatingShards()));
assertThat(cloneRequest.waitForActiveShards(), equalTo(originalRequest.waitForActiveShards()));
assertThat(cloneRequest.waitForEvents(), equalTo(originalRequest.waitForEvents()));
}
ClusterHealthRequest randomRequest() {
ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForStatus(randomFrom(ClusterHealthStatus.values()));
request.waitForNodes(randomFrom("", "<", "<=", ">", ">=") + between(0, 1000));
request.waitForNoInitializingShards(randomBoolean());
request.waitForNoRelocatingShards(randomBoolean());
request.waitForActiveShards(randomIntBetween(0, 10));
request.waitForEvents(randomFrom(Priority.values()));
return request;
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import static org.hamcrest.core.IsEqual.equalTo;
public class TransportClusterHealthActionTests extends ESTestCase {
public void testWaitForInitializingShards() throws Exception {
final String[] indices = {"test"};
final ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForNoInitializingShards(true);
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 0);
ClusterHealthResponse response = new ClusterHealthResponse("", indices, clusterState);
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(1));
request.waitForNoInitializingShards(true);
clusterState = randomClusterStateWithInitializingShards("test", between(1, 10));
response = new ClusterHealthResponse("", indices, clusterState);
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
request.waitForNoInitializingShards(false);
clusterState = randomClusterStateWithInitializingShards("test", randomInt(20));
response = new ClusterHealthResponse("", indices, clusterState);
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
}
ClusterState randomClusterStateWithInitializingShards(String index, final int initializingShards) {
final IndexMetaData indexMetaData = IndexMetaData
.builder(index)
.settings(settings(Version.CURRENT))
.numberOfShards(between(1, 10))
.numberOfReplicas(randomInt(20))
.build();
final List<ShardRoutingState> shardRoutingStates = new ArrayList<>();
IntStream.range(0, between(1, 30)).forEach(i -> shardRoutingStates.add(randomFrom(
ShardRoutingState.STARTED, ShardRoutingState.UNASSIGNED, ShardRoutingState.RELOCATING)));
IntStream.range(0, initializingShards).forEach(i -> shardRoutingStates.add(ShardRoutingState.INITIALIZING));
Randomness.shuffle(shardRoutingStates);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(indexMetaData.getIndex());
// Primary
{
ShardRoutingState state = shardRoutingStates.remove(0);
String node = state == ShardRoutingState.UNASSIGNED ? null : "node";
routingTable.addShard(
TestShardRouting.newShardRouting(shardId, node, "relocating", true, state)
);
}
// Replicas
for (int i = 0; i < shardRoutingStates.size(); i++) {
ShardRoutingState state = shardRoutingStates.get(i);
String node = state == ShardRoutingState.UNASSIGNED ? null : "node" + i;
routingTable.addShard(TestShardRouting.newShardRouting(shardId, node, "relocating"+i, randomBoolean(), state));
}
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(MetaData.builder().put(indexMetaData, true))
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
.build();
}
}

View File

@ -89,6 +89,11 @@ The cluster health API accepts the following request parameters:
for the cluster to have no shard relocations. Defaults to false, which means
it will not wait for relocating shards.
`wait_for_no_initializing_shards`::
A boolean value which controls whether to wait (until the timeout provided)
for the cluster to have no shard initializations. Defaults to false, which means
it will not wait for initializing shards.
`wait_for_active_shards`::
A number controlling to how many active shards to wait for, `all` to wait
for all shards in the cluster to be active, or `0` to not wait. Defaults to `0`.

View File

@ -47,6 +47,10 @@
"type" : "boolean",
"description" : "Whether to wait until there are no relocating shards in the cluster"
},
"wait_for_no_initializing_shards": {
"type" : "boolean",
"description" : "Whether to wait until there are no initializing shards in the cluster"
},
"wait_for_status": {
"type" : "enum",
"options" : ["green","yellow","red"],

View File

@ -92,6 +92,27 @@
- match: { unassigned_shards: 0 }
- gte: { number_of_pending_tasks: 0 }
---
"cluster health basic test, one index with wait for no initializing shards":
- skip:
version: " - 6.99.99"
reason: "wait_for_no_initializing_shards is introduced in 7.0.0"
- do:
indices.create:
index: test_index
wait_for_active_shards: 0
body:
settings:
index:
number_of_replicas: 0
- do:
cluster.health:
wait_for_no_initializing_shards: true
- match: { initializing_shards: 0 }
---
"cluster health levels":
- do: