From da510f28ab31308440386589a73fb869c45fcfb7 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 24 Feb 2010 23:16:01 +0200 Subject: [PATCH] start work on cluster health api, still needs some polising, relates to #42. --- .../action/TransportActionModule.java | 2 + .../action/TransportActions.java | 1 + .../cluster/health/ClusterHealthRequest.java | 134 +++++++++++ .../cluster/health/ClusterHealthResponse.java | 121 ++++++++++ .../cluster/health/ClusterHealthStatus.java | 54 +++++ .../cluster/health/ClusterIndexHealth.java | 135 +++++++++++ .../cluster/health/ClusterShardHealth.java | 92 +++++++ .../health/TransportClusterHealthAction.java | 187 +++++++++++++++ .../broadcast/BroadcastOperationRequest.java | 22 +- .../client/ClusterAdminClient.java | 30 +++ .../org/elasticsearch/client/Requests.java | 12 + .../server/ServerClusterAdminClient.java | 20 +- .../action/ClientTransportActionModule.java | 4 +- .../ClientTransportClusterHealthAction.java | 42 ++++ .../InternalTransportClusterAdminClient.java | 33 ++- .../DefaultShardsRoutingStrategy.java | 1 - .../gateway/IndexShardGatewayService.java | 24 +- .../index/shard/recovery/RecoveryAction.java | 224 ++++++++++++------ .../rest/action/RestActionModule.java | 2 + .../health/RestClusterHealthAction.java | 142 +++++++++++ .../IndexLifecycleActionTests.java | 117 +++++++-- 21 files changed, 1290 insertions(+), 109 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthStatus.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterIndexHealth.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterShardHealth.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/health/ClientTransportClusterHealthAction.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java index da7201674b6..11772ce250d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java @@ -20,6 +20,7 @@ package org.elasticsearch.action; import com.google.inject.AbstractModule; +import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo; import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction; import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexReplicationPingAction; @@ -58,6 +59,7 @@ public class TransportActionModule extends AbstractModule { bind(TransportNodesInfo.class).asEagerSingleton(); bind(TransportClusterStateAction.class).asEagerSingleton(); + bind(TransportClusterHealthAction.class).asEagerSingleton(); bind(TransportSinglePingAction.class).asEagerSingleton(); bind(TransportBroadcastPingAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java index 5dc37d5da03..db73ee3351a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java @@ -62,6 +62,7 @@ public class TransportActions { public static class Cluster { public static final String STATE = "/cluster/state"; + public static final String HEALTH = "/cluster/health"; public static class Node { public static final String INFO = "/cluster/nodes/info"; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java new file mode 100644 index 00000000000..56edc4f5b24 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.health; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.util.Strings; +import org.elasticsearch.util.TimeValue; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.util.TimeValue.*; + +/** + * @author kimchy (shay.banon) + */ +public class ClusterHealthRequest extends MasterNodeOperationRequest { + + private String[] indices; + + private TimeValue timeout = new TimeValue(30, TimeUnit.SECONDS); + + private ClusterHealthStatus waitForStatus; + + private int waitForRelocatingShards = -1; + + ClusterHealthRequest() { + } + + public ClusterHealthRequest(String... indices) { + this.indices = indices; + } + + public String[] indices() { + return indices; + } + + public TimeValue timeout() { + return timeout; + } + + public ClusterHealthRequest timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + public ClusterHealthStatus waitForStatus() { + return waitForStatus; + } + + public ClusterHealthRequest waitForStatus(ClusterHealthStatus waitForStatus) { + this.waitForStatus = waitForStatus; + return this; + } + + public ClusterHealthRequest waitForGreenStatus() { + return waitForStatus(ClusterHealthStatus.GREEN); + } + + public ClusterHealthRequest waitForYellowStatus() { + return waitForStatus(ClusterHealthStatus.YELLOW); + } + + public int waitForRelocatingShards() { + return waitForRelocatingShards; + } + + public ClusterHealthRequest waitForRelocatingShards(int waitForRelocatingShards) { + this.waitForRelocatingShards = waitForRelocatingShards; + return this; + } + + @Override public ActionRequestValidationException validate() { + return null; + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + super.readFrom(in); + int size = in.readInt(); + if (size == 0) { + indices = Strings.EMPTY_ARRAY; + } else { + indices = new String[size]; + for (int i = 0; i < indices.length; i++) { + indices[i] = in.readUTF(); + } + } + timeout = readTimeValue(in); + if (in.readBoolean()) { + waitForStatus = ClusterHealthStatus.fromValue(in.readByte()); + } + waitForRelocatingShards = in.readInt(); + } + + @Override public void writeTo(DataOutput out) throws IOException { + super.writeTo(out); + if (indices == null) { + out.writeInt(0); + } else { + out.writeInt(indices.length); + for (String index : indices) { + out.writeUTF(index); + } + } + timeout.writeTo(out); + if (waitForStatus == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeByte(waitForStatus.value()); + } + out.writeInt(waitForRelocatingShards); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java new file mode 100644 index 00000000000..fef0102b85a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -0,0 +1,121 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.health; + +import com.google.common.collect.Maps; +import org.elasticsearch.action.ActionResponse; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import static org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth.*; + +/** + * @author kimchy (shay.banon) + */ +public class ClusterHealthResponse implements ActionResponse, Iterable { + + private String clusterName; + + int activeShards = 0; + + int relocatingShards = 0; + + int activePrimaryShards = 0; + + boolean timedOut = false; + + ClusterHealthStatus status = ClusterHealthStatus.RED; + + Map indices = Maps.newHashMap(); + + ClusterHealthResponse() { + } + + public ClusterHealthResponse(String clusterName) { + this.clusterName = clusterName; + } + + public String clusterName() { + return clusterName; + } + + public int activeShards() { + return activeShards; + } + + public int relocatingShards() { + return relocatingShards; + } + + public int activePrimaryShards() { + return activePrimaryShards; + } + + /** + * true if the waitForXXX has timeout out and did not match. + */ + public boolean timedOut() { + return this.timedOut; + } + + public ClusterHealthStatus status() { + return status; + } + + public Map indices() { + return indices; + } + + @Override public Iterator iterator() { + return indices.values().iterator(); + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + clusterName = in.readUTF(); + activePrimaryShards = in.readInt(); + activeShards = in.readInt(); + relocatingShards = in.readInt(); + status = ClusterHealthStatus.fromValue(in.readByte()); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + ClusterIndexHealth indexHealth = readClusterIndexHealth(in); + indices.put(indexHealth.index(), indexHealth); + } + timedOut = in.readBoolean(); + } + + @Override public void writeTo(DataOutput out) throws IOException { + out.writeUTF(clusterName); + out.writeInt(activePrimaryShards); + out.writeInt(activeShards); + out.writeInt(relocatingShards); + out.writeByte(status.value()); + out.writeInt(indices.size()); + for (ClusterIndexHealth indexHealth : this) { + indexHealth.writeTo(out); + } + out.writeBoolean(timedOut); + } + +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthStatus.java new file mode 100644 index 00000000000..db04512b281 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthStatus.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.health; + +import org.elasticsearch.ElasticSearchIllegalArgumentException; + +/** + * @author kimchy (shay.banon) + */ +public enum ClusterHealthStatus { + GREEN((byte) 0), + YELLOW((byte) 1), + RED((byte) 2); + + private byte value; + + ClusterHealthStatus(byte value) { + this.value = value; + } + + public byte value() { + return value; + } + + public static ClusterHealthStatus fromValue(byte value) { + switch (value) { + case 0: + return GREEN; + case 1: + return YELLOW; + case 2: + return RED; + default: + throw new ElasticSearchIllegalArgumentException("No cluster health status for value [" + value + "]"); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterIndexHealth.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterIndexHealth.java new file mode 100644 index 00000000000..03e1fea96c0 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterIndexHealth.java @@ -0,0 +1,135 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.health; + +import com.google.common.collect.Maps; +import org.elasticsearch.util.io.Streamable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import static org.elasticsearch.action.admin.cluster.health.ClusterShardHealth.*; + +/** + * @author kimchy (shay.banon) + */ +public class ClusterIndexHealth implements Iterable, Streamable { + + private String index; + + private int numberOfShards; + + private int numberOfReplicas; + + int activeShards = 0; + + int relocatingShards = 0; + + int activePrimaryShards = 0; + + ClusterHealthStatus status = ClusterHealthStatus.RED; + + final Map shards = Maps.newHashMap(); + + private ClusterIndexHealth() { + } + + public ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas) { + this.index = index; + this.numberOfShards = numberOfShards; + this.numberOfReplicas = numberOfReplicas; + } + + public String index() { + return index; + } + + public int numberOfShards() { + return numberOfShards; + } + + public int numberOfReplicas() { + return numberOfReplicas; + } + + public int activeShards() { + return activeShards; + } + + public int relocatingShards() { + return relocatingShards; + } + + public int activePrimaryShards() { + return activePrimaryShards; + } + + public ClusterHealthStatus status() { + return status; + } + + public Map shards() { + return this.shards; + } + + @Override public Iterator iterator() { + return shards.values().iterator(); + } + + public static ClusterIndexHealth readClusterIndexHealth(DataInput in) throws IOException, ClassNotFoundException { + ClusterIndexHealth indexHealth = new ClusterIndexHealth(); + indexHealth.readFrom(in); + return indexHealth; + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + index = in.readUTF(); + numberOfShards = in.readInt(); + numberOfReplicas = in.readInt(); + activePrimaryShards = in.readInt(); + activeShards = in.readInt(); + relocatingShards = in.readInt(); + status = ClusterHealthStatus.fromValue(in.readByte()); + + int size = in.readInt(); + for (int i = 0; i < size; i++) { + ClusterShardHealth shardHealth = readClusterShardHealth(in); + shards.put(shardHealth.id(), shardHealth); + } + } + + @Override public void writeTo(DataOutput out) throws IOException { + out.writeUTF(index); + out.writeInt(numberOfShards); + out.writeInt(numberOfReplicas); + out.writeInt(activePrimaryShards); + out.writeInt(activeShards); + out.writeInt(relocatingShards); + out.writeByte(status.value()); + + out.writeInt(shards.size()); + for (ClusterShardHealth shardHealth : this) { + shardHealth.writeTo(out); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterShardHealth.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterShardHealth.java new file mode 100644 index 00000000000..1583e509cd5 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterShardHealth.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.health; + +import org.elasticsearch.util.io.Streamable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public class ClusterShardHealth implements Streamable { + + private int shardId; + + ClusterHealthStatus status = ClusterHealthStatus.RED; + + int activeShards = 0; + + int relocatingShards = 0; + + boolean primaryActive = false; + + private ClusterShardHealth() { + + } + + ClusterShardHealth(int shardId) { + this.shardId = shardId; + } + + public int id() { + return shardId; + } + + public ClusterHealthStatus status() { + return status; + } + + public int relocatingShards() { + return relocatingShards; + } + + public int activeShards() { + return activeShards; + } + + public boolean primaryActive() { + return primaryActive; + } + + static ClusterShardHealth readClusterShardHealth(DataInput in) throws IOException, ClassNotFoundException { + ClusterShardHealth ret = new ClusterShardHealth(); + ret.readFrom(in); + return ret; + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + shardId = in.readInt(); + status = ClusterHealthStatus.fromValue(in.readByte()); + activeShards = in.readInt(); + relocatingShards = in.readInt(); + primaryActive = in.readBoolean(); + } + + @Override public void writeTo(DataOutput out) throws IOException { + out.writeInt(shardId); + out.writeByte(status.value()); + out.writeInt(activeShards); + out.writeInt(relocatingShards); + out.writeBoolean(primaryActive); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java new file mode 100644 index 00000000000..cea3492d4f9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -0,0 +1,187 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.health; + +import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.timer.TimerService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.util.settings.Settings; + +import static org.elasticsearch.action.Actions.*; + +/** + * @author kimchy (shay.banon) + */ +public class TransportClusterHealthAction extends TransportMasterNodeOperationAction { + + private final ClusterName clusterName; + + private final TimerService timerService; + + @Inject public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + TimerService timerService, ClusterName clusterName) { + super(settings, transportService, clusterService, threadPool); + this.clusterName = clusterName; + this.timerService = timerService; + } + + @Override protected String transportAction() { + return TransportActions.Admin.Cluster.HEALTH; + } + + @Override protected ClusterHealthRequest newRequest() { + return new ClusterHealthRequest(); + } + + @Override protected ClusterHealthResponse newResponse() { + return new ClusterHealthResponse(); + } + + @Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request) throws ElasticSearchException { + int waitFor = 2; + if (request.waitForStatus() == null) { + waitFor--; + } + if (request.waitForRelocatingShards() == -1) { + waitFor--; + } + if (waitFor == 0) { + // no need to wait for anything + return clusterHealth(request); + } + long endTime = System.currentTimeMillis() + request.timeout().millis(); + while (true) { + int waitForCounter = 0; + ClusterHealthResponse response = clusterHealth(request); + if (request.waitForStatus() != null && response.status() == request.waitForStatus()) { + waitForCounter++; + } + if (request.waitForRelocatingShards() != -1 && response.relocatingShards() <= request.waitForRelocatingShards()) { + waitForCounter++; + } + if (waitForCounter == waitFor) { + return response; + } + if (timerService.estimatedTimeInMillis() > endTime) { + response.timedOut = true; + return response; + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + response.timedOut = true; + // we got interrupted, bail + return response; + } + } + } + + private ClusterHealthResponse clusterHealth(ClusterHealthRequest request) { + ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value()); + ClusterState clusterState = clusterService.state(); + String[] indices = processIndices(clusterState, request.indices()); + for (String index : indices) { + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); + IndexMetaData indexMetaData = clusterState.metaData().index(index); + if (indexRoutingTable == null) { + continue; + } + ClusterIndexHealth indexHealth = new ClusterIndexHealth(index, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas()); + + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + ClusterShardHealth shardHealth = new ClusterShardHealth(shardRoutingTable.shardId().id()); + for (ShardRouting shardRouting : shardRoutingTable) { + if (shardRouting.active()) { + shardHealth.activeShards++; + if (shardRouting.relocating()) { + // the shard is relocating, the one he is relocating to will be in initializing state, so we don't count it + shardHealth.relocatingShards++; + } + if (shardRouting.primary()) { + shardHealth.primaryActive = true; + } + } + } + if (shardHealth.primaryActive) { + if (shardHealth.activeShards == shardRoutingTable.size()) { + shardHealth.status = ClusterHealthStatus.GREEN; + } else { + shardHealth.status = ClusterHealthStatus.YELLOW; + } + } else { + shardHealth.status = ClusterHealthStatus.RED; + } + indexHealth.shards.put(shardHealth.id(), shardHealth); + } + + for (ClusterShardHealth shardHealth : indexHealth) { + if (shardHealth.primaryActive()) { + indexHealth.activePrimaryShards++; + } + indexHealth.activeShards += shardHealth.activeShards; + indexHealth.relocatingShards += shardHealth.relocatingShards; + } + // update the index status + indexHealth.status = ClusterHealthStatus.GREEN; + for (ClusterShardHealth shardHealth : indexHealth) { + if (shardHealth.status() == ClusterHealthStatus.RED) { + indexHealth.status = ClusterHealthStatus.RED; + break; + } + if (shardHealth.status() == ClusterHealthStatus.YELLOW) { + indexHealth.status = ClusterHealthStatus.YELLOW; + } + } + + response.indices.put(indexHealth.index(), indexHealth); + } + + for (ClusterIndexHealth indexHealth : response) { + response.activePrimaryShards += indexHealth.activePrimaryShards; + response.activeShards += indexHealth.activeShards; + response.relocatingShards += indexHealth.relocatingShards; + } + + response.status = ClusterHealthStatus.GREEN; + for (ClusterIndexHealth indexHealth : response) { + if (indexHealth.status() == ClusterHealthStatus.RED) { + response.status = ClusterHealthStatus.RED; + break; + } + if (indexHealth.status() == ClusterHealthStatus.YELLOW) { + response.status = ClusterHealthStatus.YELLOW; + } + } + + + return response; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationRequest.java index 4c18c13cf5c..bcfac50ddf0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastOperationRequest.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.support.broadcast; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.util.Nullable; +import org.elasticsearch.util.Strings; import java.io.DataInput; import java.io.DataOutput; @@ -78,9 +79,13 @@ public abstract class BroadcastOperationRequest implements ActionRequest { } @Override public void writeTo(DataOutput out) throws IOException { - out.writeInt(indices.length); - for (String index : indices) { - out.writeUTF(index); + if (indices == null) { + out.writeInt(0); + } else { + out.writeInt(indices.length); + for (String index : indices) { + out.writeUTF(index); + } } if (queryHint == null) { out.writeBoolean(false); @@ -92,9 +97,14 @@ public abstract class BroadcastOperationRequest implements ActionRequest { } @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - indices = new String[in.readInt()]; - for (int i = 0; i < indices.length; i++) { - indices[i] = in.readUTF(); + int size = in.readInt(); + if (size == 0) { + indices = Strings.EMPTY_ARRAY; + } else { + indices = new String[size]; + for (int i = 0; i < indices.length; i++) { + indices[i] = in.readUTF(); + } } if (in.readBoolean()) { queryHint = in.readUTF(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index 601fcca5700..76e413c9b4c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -21,6 +21,8 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest; @@ -40,6 +42,34 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; */ public interface ClusterAdminClient { + /** + * The health of the cluster. + * + * @param request The cluster state request + * @return The result future + * @see Requests#clusterHealth(String...) + */ + ActionFuture health(ClusterHealthRequest request); + + /** + * The health of the cluster. + * + * @param request The cluster state request + * @param listener A listener to be notified with a result + * @return The result future + * @see Requests#clusterHealth(String...) + */ + ActionFuture health(ClusterHealthRequest request, ActionListener listener); + + /** + * The health of the cluster. + * + * @param request The cluster state request + * @param listener A listener to be notified with a result + * @see Requests#clusterHealth(String...) + */ + void execHealth(ClusterHealthRequest request, ActionListener listener); + /** * The state of the cluster. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java index 63e7ff5c7e4..03e7286774e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java @@ -19,6 +19,7 @@ package org.elasticsearch.client; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest; import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest; @@ -233,6 +234,17 @@ public class Requests { return new ClusterStateRequest(); } + /** + * Creates a cluster health request. + * + * @param indices The indices to optimize. Use null or _all to execute against all indices + * @return The cluster health request + * @see org.elasticsearch.client.ClusterAdminClient#health(org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest) + */ + public static ClusterHealthRequest clusterHealth(String... indices) { + return new ClusterHealthRequest(indices); + } + /** * Creates a nodes info request against all the nodes. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/server/ServerClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/server/ServerClusterAdminClient.java index 3d73d44600f..ae37f13ed0d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/server/ServerClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/server/ServerClusterAdminClient.java @@ -22,6 +22,9 @@ package org.elasticsearch.client.server; import com.google.inject.Inject; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo; @@ -46,6 +49,8 @@ import org.elasticsearch.util.settings.Settings; */ public class ServerClusterAdminClient extends AbstractComponent implements ClusterAdminClient { + private final TransportClusterHealthAction clusterHealthAction; + private final TransportClusterStateAction clusterStateAction; private final TransportSinglePingAction singlePingAction; @@ -57,10 +62,11 @@ public class ServerClusterAdminClient extends AbstractComponent implements Clust private final TransportNodesInfo nodesInfo; @Inject public ServerClusterAdminClient(Settings settings, - TransportClusterStateAction clusterStateAction, + TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction, TransportSinglePingAction singlePingAction, TransportBroadcastPingAction broadcastPingAction, TransportReplicationPingAction replicationPingAction, TransportNodesInfo nodesInfo) { super(settings); + this.clusterHealthAction = clusterHealthAction; this.clusterStateAction = clusterStateAction; this.nodesInfo = nodesInfo; this.singlePingAction = singlePingAction; @@ -68,6 +74,18 @@ public class ServerClusterAdminClient extends AbstractComponent implements Clust this.replicationPingAction = replicationPingAction; } + @Override public ActionFuture health(ClusterHealthRequest request) { + return clusterHealthAction.submit(request); + } + + @Override public ActionFuture health(ClusterHealthRequest request, ActionListener listener) { + return clusterHealthAction.submit(request, listener); + } + + @Override public void execHealth(ClusterHealthRequest request, ActionListener listener) { + clusterHealthAction.execute(request, listener); + } + @Override public ActionFuture state(ClusterStateRequest request) { return clusterStateAction.submit(request); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java index de41f475bfe..e50ecec39f6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.transport.action; import com.google.inject.AbstractModule; +import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction; import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction; @@ -43,7 +44,7 @@ import org.elasticsearch.client.transport.action.search.ClientTransportSearchScr import org.elasticsearch.client.transport.action.terms.ClientTransportTermsAction; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class ClientTransportActionModule extends AbstractModule { @@ -70,5 +71,6 @@ public class ClientTransportActionModule extends AbstractModule { bind(ClientTransportReplicationPingAction.class).asEagerSingleton(); bind(ClientTransportBroadcastPingAction.class).asEagerSingleton(); bind(ClientTransportClusterStateAction.class).asEagerSingleton(); + bind(ClientTransportClusterHealthAction.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/health/ClientTransportClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/health/ClientTransportClusterHealthAction.java new file mode 100644 index 00000000000..c46aef3ec1d --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/health/ClientTransportClusterHealthAction.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport.action.admin.cluster.health; + +import com.google.inject.Inject; +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.client.transport.action.support.BaseClientTransportAction; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.util.settings.Settings; + +/** + * @author kimchy (Shay Banon) + */ +public class ClientTransportClusterHealthAction extends BaseClientTransportAction { + + @Inject public ClientTransportClusterHealthAction(Settings settings, TransportService transportService) { + super(settings, transportService, ClusterHealthResponse.class); + } + + @Override protected String action() { + return TransportActions.Admin.Cluster.HEALTH; + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java index 8e19e73acd6..3df5ca0423d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java @@ -23,6 +23,8 @@ import com.google.inject.Inject; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest; @@ -35,6 +37,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.transport.TransportClientNodesService; +import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction; import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction; @@ -51,6 +54,8 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple private final TransportClientNodesService nodesService; + private final ClientTransportClusterHealthAction clusterHealthAction; + private final ClientTransportClusterStateAction clusterStateAction; private final ClientTransportSinglePingAction singlePingAction; @@ -62,11 +67,12 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple private final ClientTransportNodesInfoAction nodesInfoAction; @Inject public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, - ClientTransportClusterStateAction clusterStateAction, + ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction, ClientTransportSinglePingAction singlePingAction, ClientTransportReplicationPingAction replicationPingAction, ClientTransportBroadcastPingAction broadcastPingAction, ClientTransportNodesInfoAction nodesInfoAction) { super(settings); this.nodesService = nodesService; + this.clusterHealthAction = clusterHealthAction; this.clusterStateAction = clusterStateAction; this.nodesInfoAction = nodesInfoAction; this.singlePingAction = singlePingAction; @@ -74,6 +80,31 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple this.broadcastPingAction = broadcastPingAction; } + @Override public ActionFuture health(final ClusterHealthRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return clusterHealthAction.submit(node, request); + } + }); + } + + @Override public ActionFuture health(final ClusterHealthRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return clusterHealthAction.submit(node, request, listener); + } + }); + } + + @Override public void execHealth(final ClusterHealthRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + clusterHealthAction.execute(node, request, listener); + return null; + } + }); + } + @Override public ActionFuture state(final ClusterStateRequest request) { return nodesService.execute(new TransportClientNodesService.NodeCallback>() { @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java index 5728061ac90..c231aafe707 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/strategy/DefaultShardsRoutingStrategy.java @@ -357,7 +357,6 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy { shard.deassignNode(); shards.remove(); } else { - assert shard.state() == ShardRoutingState.RELOCATING; shard.cancelRelocation(); } break; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 1e0eba8ca9a..0e521969e62 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -145,18 +145,24 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent { // do not snapshot when in the process of relocation of primaries so we won't get conflicts return; } - indexShard.snapshot(new Engine.SnapshotHandler() { - @Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException { - if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) { + try { + indexShard.snapshot(new Engine.SnapshotHandler() { + @Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException { + if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) { - shardGateway.snapshot(snapshotIndexCommit, translogSnapshot); + shardGateway.snapshot(snapshotIndexCommit, translogSnapshot); - lastIndexVersion = snapshotIndexCommit.getVersion(); - lastTranslogId = translogSnapshot.translogId(); - lastTranslogSize = translogSnapshot.size(); + lastIndexVersion = snapshotIndexCommit.getVersion(); + lastTranslogId = translogSnapshot.translogId(); + lastTranslogSize = translogSnapshot.size(); + } } - } - }); + }); + } catch (IllegalIndexShardStateException e) { + // ignore, that's fine + } catch (Exception e) { + logger.warn("Failed to snapshot on close", e); + } } public void close() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java index fb2516d00e0..117db078d75 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java @@ -48,7 +48,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.*; @@ -77,6 +79,16 @@ public class RecoveryAction extends AbstractIndexShardComponent { private final String snapshotTransportAction; + private volatile boolean closed = false; + + private volatile Thread sendStartRecoveryThread; + + private volatile Thread receiveSnapshotRecoveryThread; + + private volatile Thread sendSnapshotRecoveryThread; + + private final CopyOnWriteArrayList sendFileChunksRecoveryFutures = new CopyOnWriteArrayList(); + @Inject public RecoveryAction(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, TransportService transportService, IndexShard indexShard, Store store) { super(shardId, indexSettings); this.threadPool = threadPool; @@ -96,64 +108,97 @@ public class RecoveryAction extends AbstractIndexShardComponent { } public void close() { + closed = true; transportService.removeHandler(startTransportAction); transportService.removeHandler(fileChunkTransportAction); transportService.removeHandler(snapshotTransportAction); cleanOpenIndex(); + + if (true) { + // disable the interruptions for now + return; + } + + // interrupt the startRecovery thread if its performing recovery + if (sendStartRecoveryThread != null) { + sendStartRecoveryThread.interrupt(); + } + if (receiveSnapshotRecoveryThread != null) { + receiveSnapshotRecoveryThread.interrupt(); + } + if (sendSnapshotRecoveryThread != null) { + sendSnapshotRecoveryThread.interrupt(); + } + for (Future future : sendFileChunksRecoveryFutures) { + future.cancel(true); + } } public synchronized void startRecovery(Node node, Node targetNode, boolean markAsRelocated) throws ElasticSearchException { - // mark the shard as recovering - IndexShardState preRecoveringState; + sendStartRecoveryThread = Thread.currentThread(); try { - preRecoveringState = indexShard.recovering(); - } catch (IndexShardRecoveringException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already recovering - throw new IgnoreRecoveryException("Already in recovering process", e); - } catch (IndexShardStartedException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already started - throw new IgnoreRecoveryException("Already in recovering process", e); - } catch (IndexShardRelocatedException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already relocated - throw new IgnoreRecoveryException("Already in recovering process", e); - } catch (IndexShardClosedException e) { - throw new IgnoreRecoveryException("can't recover a closed shard.", e); - } - logger.debug("Starting recovery from {}", targetNode); - StopWatch stopWatch = new StopWatch().start(); - try { - RecoveryStatus recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, new StartRecoveryRequest(node, markAsRelocated), new FutureTransportResponseHandler() { - @Override public RecoveryStatus newInstance() { - return new RecoveryStatus(); + // mark the shard as recovering + IndexShardState preRecoveringState; + try { + preRecoveringState = indexShard.recovering(); + } catch (IndexShardRecoveringException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already recovering + throw new IgnoreRecoveryException("Already in recovering process", e); + } catch (IndexShardStartedException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already started + throw new IgnoreRecoveryException("Already in recovering process", e); + } catch (IndexShardRelocatedException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already relocated + throw new IgnoreRecoveryException("Already in recovering process", e); + } catch (IndexShardClosedException e) { + throw new IgnoreRecoveryException("Can't recover a closed shard.", e); + } + logger.debug("Starting recovery from {}", targetNode); + StopWatch stopWatch = new StopWatch().start(); + try { + if (closed) { + throw new IgnoreRecoveryException("Recovery closed"); } - }).txGet(); - stopWatch.stop(); - if (logger.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("Recovery completed from ").append(targetNode).append(", took [").append(stopWatch.totalTime()).append("]\n"); - sb.append(" Phase1: recovered [").append(recoveryStatus.phase1FileNames.size()).append("]") - .append(" files with total size of [").append(new SizeValue(recoveryStatus.phase1TotalSize)).append("]") - .append(", took [").append(new TimeValue(recoveryStatus.phase1Time, MILLISECONDS)).append("]") - .append("\n"); - sb.append(" Phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations") - .append(", took [").append(new TimeValue(recoveryStatus.phase2Time, MILLISECONDS)).append("]") - .append("\n"); - sb.append(" Phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations") - .append(", took [").append(new TimeValue(recoveryStatus.phase3Time, MILLISECONDS)).append("]"); - logger.debug(sb.toString()); + RecoveryStatus recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, new StartRecoveryRequest(node, markAsRelocated), new FutureTransportResponseHandler() { + @Override public RecoveryStatus newInstance() { + return new RecoveryStatus(); + } + }).txGet(); + stopWatch.stop(); + if (logger.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("Recovery completed from ").append(targetNode).append(", took [").append(stopWatch.totalTime()).append("]\n"); + sb.append(" Phase1: recovered [").append(recoveryStatus.phase1FileNames.size()).append("]") + .append(" files with total size of [").append(new SizeValue(recoveryStatus.phase1TotalSize)).append("]") + .append(", took [").append(new TimeValue(recoveryStatus.phase1Time, MILLISECONDS)).append("]") + .append("\n"); + sb.append(" Phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations") + .append(", took [").append(new TimeValue(recoveryStatus.phase2Time, MILLISECONDS)).append("]") + .append("\n"); + sb.append(" Phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations") + .append(", took [").append(new TimeValue(recoveryStatus.phase3Time, MILLISECONDS)).append("]"); + logger.debug(sb.toString()); + } + } catch (RemoteTransportException e) { + if (closed) { + throw new IgnoreRecoveryException("Recovery closed", e); + } + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof ActionNotFoundTransportException || cause instanceof IndexShardNotStartedException) { + // the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering + indexShard.restoreRecoveryState(preRecoveringState); + throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e); + } + throw new RecoveryFailedException(shardId, node, targetNode, e); + } catch (Exception e) { + if (closed) { + throw new IgnoreRecoveryException("Recovery closed", e); + } + throw new RecoveryFailedException(shardId, node, targetNode, e); } - } catch (RemoteTransportException e) { - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof ActionNotFoundTransportException || - cause instanceof IndexShardNotStartedException) { - // the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering - indexShard.restoreRecoveryState(preRecoveringState); - throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e); - } - throw new RecoveryFailedException(shardId, node, targetNode, e); - } catch (Exception e) { - throw new RecoveryFailedException(shardId, node, targetNode, e); + } finally { + sendStartRecoveryThread = null; } } @@ -226,7 +271,7 @@ public class RecoveryAction extends AbstractIndexShardComponent { final CountDownLatch latch = new CountDownLatch(snapshot.getFiles().length); final AtomicReference lastException = new AtomicReference(); for (final String name : snapshot.getFiles()) { - threadPool.execute(new Runnable() { + sendFileChunksRecoveryFutures.add(threadPool.submit(new Runnable() { @Override public void run() { IndexInput indexInput = null; try { @@ -256,7 +301,7 @@ public class RecoveryAction extends AbstractIndexShardComponent { latch.countDown(); } } - }); + })); } latch.await(); @@ -270,31 +315,49 @@ public class RecoveryAction extends AbstractIndexShardComponent { recoveryStatus.phase1Time = stopWatch.totalTime().millis(); } catch (Throwable e) { throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new SizeValue(totalSize), e); + } finally { + sendFileChunksRecoveryFutures.clear(); } } @Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException { - logger.trace("Recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size()); - StopWatch stopWatch = new StopWatch().start(); - sendSnapshot(snapshot, false); - stopWatch.stop(); - logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime()); - recoveryStatus.phase2Time = stopWatch.totalTime().millis(); - recoveryStatus.phase2Operations = snapshot.size(); + sendSnapshotRecoveryThread = Thread.currentThread(); + try { + if (closed) { + throw new IndexShardClosedException(shardId); + } + logger.trace("Recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size()); + StopWatch stopWatch = new StopWatch().start(); + sendSnapshot(snapshot, false); + stopWatch.stop(); + logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime()); + recoveryStatus.phase2Time = stopWatch.totalTime().millis(); + recoveryStatus.phase2Operations = snapshot.size(); + } finally { + sendSnapshotRecoveryThread = null; + } } @Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException { - logger.trace("Recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size()); - StopWatch stopWatch = new StopWatch().start(); - sendSnapshot(snapshot, true); - if (startRecoveryRequest.markAsRelocated) { - // TODO what happens if the recovery process fails afterwards, we need to mark this back to started - indexShard.relocated(); + sendSnapshotRecoveryThread = Thread.currentThread(); + try { + if (closed) { + throw new IndexShardClosedException(shardId); + } + logger.trace("Recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size()); + StopWatch stopWatch = new StopWatch().start(); + sendSnapshot(snapshot, true); + if (startRecoveryRequest.markAsRelocated) { + // TODO what happens if the recovery process fails afterwards, we need to mark this back to started + indexShard.relocated(); + } + stopWatch.stop(); + logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime()); + recoveryStatus.phase3Time = stopWatch.totalTime().millis(); + recoveryStatus.phase3Operations = snapshot.size(); + } finally { + sendSnapshotRecoveryThread = null; } - stopWatch.stop(); - logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime()); - recoveryStatus.phase3Time = stopWatch.totalTime().millis(); - recoveryStatus.phase3Operations = snapshot.size(); } private void sendSnapshot(Translog.Snapshot snapshot, boolean phase3) throws ElasticSearchException { @@ -371,16 +434,24 @@ public class RecoveryAction extends AbstractIndexShardComponent { } @Override public void messageReceived(SnapshotWrapper snapshot, TransportChannel channel) throws Exception { - if (!snapshot.phase3) { - // clean open index outputs in any case (there should not be any open, we close then in the chunk) - cleanOpenIndex(); + receiveSnapshotRecoveryThread = Thread.currentThread(); + try { + if (closed) { + throw new IndexShardClosedException(shardId); + } + if (!snapshot.phase3) { + // clean open index outputs in any case (there should not be any open, we close then in the chunk) + cleanOpenIndex(); + } + indexShard.performRecovery(snapshot.snapshot, snapshot.phase3); + if (snapshot.phase3) { + indexShard.refresh(new Engine.Refresh(true)); + // probably need to do more here... + } + channel.sendResponse(VoidStreamable.INSTANCE); + } finally { + receiveSnapshotRecoveryThread = null; } - indexShard.performRecovery(snapshot.snapshot, snapshot.phase3); - if (snapshot.phase3) { - indexShard.refresh(new Engine.Refresh(true)); - // probably need to do more here... - } - channel.sendResponse(VoidStreamable.INSTANCE); } } @@ -417,6 +488,9 @@ public class RecoveryAction extends AbstractIndexShardComponent { } @Override public void messageReceived(FileChunk request, TransportChannel channel) throws Exception { + if (closed) { + throw new IndexShardClosedException(shardId); + } IndexOutput indexOutput; if (request.position == 0) { // first request diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index 08bc56e2612..a063a0c1bdf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action; import com.google.inject.AbstractModule; +import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction; import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction; import org.elasticsearch.rest.action.admin.cluster.ping.broadcast.RestBroadcastPingAction; import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicationPingAction; @@ -52,6 +53,7 @@ public class RestActionModule extends AbstractModule { bind(RestNodesInfoAction.class).asEagerSingleton(); bind(RestClusterStateAction.class).asEagerSingleton(); + bind(RestClusterHealthAction.class).asEagerSingleton(); bind(RestSinglePingAction.class).asEagerSingleton(); bind(RestBroadcastPingAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java new file mode 100644 index 00000000000..3c6d710a190 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.health; + +import com.google.inject.Inject; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.*; +import org.elasticsearch.client.Client; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestJsonBuilder; +import org.elasticsearch.util.json.JsonBuilder; +import org.elasticsearch.util.settings.Settings; + +import java.io.IOException; + +import static org.elasticsearch.client.Requests.*; +import static org.elasticsearch.rest.RestResponse.Status.*; + +/** + * @author kimchy (shay.banon) + */ +public class RestClusterHealthAction extends BaseRestHandler { + + @Inject public RestClusterHealthAction(Settings settings, Client client, RestController controller) { + super(settings, client); + + controller.registerHandler(RestRequest.Method.GET, "/_cluster/health", this); + controller.registerHandler(RestRequest.Method.GET, "/_cluster/health/{index}", this); + } + + @Override public void handleRequest(final RestRequest request, final RestChannel channel) { + ClusterHealthRequest clusterHealthRequest = clusterHealth(RestActions.splitIndices(request.param("index"))); + int level = 0; + try { + clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout())); + String waitForStatus = request.param("waitForStatus"); + if (waitForStatus != null) { + clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase())); + } + clusterHealthRequest.waitForRelocatingShards(request.paramAsInt("waitForRelocatingShards", clusterHealthRequest.waitForRelocatingShards())); + String sLevel = request.param("level"); + if (sLevel != null) { + if ("cluster".equals("sLevel")) { + level = 0; + } else if ("indices".equals(sLevel)) { + level = 1; + } else if ("shards".equals(sLevel)) { + level = 2; + } + } + } catch (Exception e) { + try { + JsonBuilder builder = RestJsonBuilder.restJsonBuilder(request); + channel.sendResponse(new JsonRestResponse(request, PRECONDITION_FAILED, builder.startObject().field("error", e.getMessage()).endObject())); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + return; + } + final int fLevel = level; + client.admin().cluster().execHealth(clusterHealthRequest, new ActionListener() { + @Override public void onResponse(ClusterHealthResponse response) { + try { + JsonBuilder builder = RestJsonBuilder.restJsonBuilder(request); + builder.startObject(); + + builder.field("status", response.status().name().toLowerCase()); + builder.field("timedOut", response.timedOut()); + builder.field("activePrimaryShards", response.activePrimaryShards()); + builder.field("activeShards", response.activeShards()); + builder.field("relocatingShards", response.relocatingShards()); + + if (fLevel > 0) { + builder.startObject("indices"); + for (ClusterIndexHealth indexHealth : response) { + builder.startObject(indexHealth.index()); + + builder.field("status", indexHealth.status().name().toLowerCase()); + builder.field("numberOfShards", indexHealth.numberOfShards()); + builder.field("numberOfReplicas", indexHealth.numberOfReplicas()); + builder.field("activePrimaryShards", indexHealth.activePrimaryShards()); + builder.field("activeShards", indexHealth.activeShards()); + builder.field("relocatingShards", indexHealth.relocatingShards()); + + if (fLevel > 1) { + builder.startObject("shards"); + + for (ClusterShardHealth shardHealth : indexHealth) { + builder.startObject(Integer.toString(shardHealth.id())); + + builder.field("status", shardHealth.status().name().toLowerCase()); + builder.field("primaryActive", shardHealth.primaryActive()); + builder.field("activeShards", shardHealth.activeShards()); + builder.field("relocatingShards", shardHealth.relocatingShards()); + + builder.endObject(); + } + + builder.endObject(); + } + + builder.endObject(); + } + builder.endObject(); + } + + builder.endObject(); + + channel.sendResponse(new JsonRestResponse(request, RestResponse.Status.OK, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override public void onFailure(Throwable e) { + try { + channel.sendResponse(new JsonThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } +} diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java index 015d1c46abf..c19d3b2211c 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java @@ -19,6 +19,10 @@ package org.elasticsearch.test.integration.indexlifecycle; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.RoutingNode; @@ -61,9 +65,18 @@ public class IndexLifecycleActionTests extends AbstractServersTests { ClusterService clusterService1 = ((InternalServer) server("server1")).injector().getInstance(ClusterService.class); logger.info("Creating index [test]"); - client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + assertThat(createIndexResponse.acknowledged(), equalTo(true)); + + logger.info("Running Cluster Health"); + ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + // sleep till the cluster state gets published, since we check the master + Thread.sleep(200); - Thread.sleep(1000); ClusterState clusterState1 = clusterService1.state(); RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); @@ -79,7 +92,14 @@ public class IndexLifecycleActionTests extends AbstractServersTests { ClusterService clusterService2 = ((InternalServer) server("server2")).injector().getInstance(ClusterService.class); - Thread.sleep(1500); + logger.info("Running Cluster Health"); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + // sleep till the cluster state gets published, since we check the master + Thread.sleep(200); clusterState1 = clusterService1.state(); routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); @@ -95,7 +115,17 @@ public class IndexLifecycleActionTests extends AbstractServersTests { ClusterService clusterService3 = ((InternalServer) server("server3")).injector().getInstance(ClusterService.class); - Thread.sleep(1500); + logger.info("Running Cluster Health"); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.relocatingShards(), equalTo(0)); + assertThat(clusterHealth.activeShards(), equalTo(22)); + assertThat(clusterHealth.activePrimaryShards(), equalTo(11)); + + // sleep till the cluster state gets published, since we check the master + Thread.sleep(200); clusterState1 = clusterService1.state(); routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); @@ -114,8 +144,20 @@ public class IndexLifecycleActionTests extends AbstractServersTests { logger.info("Closing server1"); // kill the first server closeServer("server1"); + // wait a bit so it will be discovered as removed + Thread.sleep(200); + // verify health + logger.info("Running Cluster Health"); + clusterHealth = client("server2").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.relocatingShards(), equalTo(0)); + assertThat(clusterHealth.activeShards(), equalTo(22)); + assertThat(clusterHealth.activePrimaryShards(), equalTo(11)); - Thread.sleep(1500); + // sleep till the cluster state gets published, since we check the master + Thread.sleep(200); clusterState2 = clusterService2.state(); routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); @@ -129,9 +171,10 @@ public class IndexLifecycleActionTests extends AbstractServersTests { logger.info("Deleting index [test]"); // last, lets delete the index - client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + assertThat(deleteIndexResponse.acknowledged(), equalTo(true)); - Thread.sleep(1500); + Thread.sleep(200); clusterState2 = clusterService2.state(); routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2, nullValue()); @@ -155,9 +198,17 @@ public class IndexLifecycleActionTests extends AbstractServersTests { ClusterService clusterService1 = ((InternalServer) server("server1")).injector().getInstance(ClusterService.class); logger.info("Creating index [test]"); - client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + assertThat(createIndexResponse.acknowledged(), equalTo(true)); - Thread.sleep(1000); + logger.info("Running Cluster Health"); + ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.relocatingShards(), equalTo(0)); + assertThat(clusterHealth.activeShards(), equalTo(11)); + assertThat(clusterHealth.activePrimaryShards(), equalTo(11)); ClusterState clusterState1 = clusterService1.state(); RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); @@ -166,11 +217,23 @@ public class IndexLifecycleActionTests extends AbstractServersTests { // start another server logger.info("Starting server2"); startServer("server2", settings); + // wait a bit + Thread.sleep(200); + + logger.info("Running Cluster Health"); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.relocatingShards(), equalTo(0)); + assertThat(clusterHealth.activeShards(), equalTo(11)); + assertThat(clusterHealth.activePrimaryShards(), equalTo(11)); + + // sleep till the cluster state gets published, since we check the master + Thread.sleep(200); ClusterService clusterService2 = ((InternalServer) server("server2")).injector().getInstance(ClusterService.class); - Thread.sleep(2000); - clusterState1 = clusterService1.state(); routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(6), equalTo(5))); @@ -182,10 +245,22 @@ public class IndexLifecycleActionTests extends AbstractServersTests { // start another server logger.info("Starting server3"); startServer("server3"); + // wait a bit so assignment will start + Thread.sleep(200); ClusterService clusterService3 = ((InternalServer) server("server3")).injector().getInstance(ClusterService.class); - Thread.sleep(1500); + logger.info("Running Cluster Health"); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.relocatingShards(), equalTo(0)); + assertThat(clusterHealth.activeShards(), equalTo(11)); + assertThat(clusterHealth.activePrimaryShards(), equalTo(11)); + + // sleep till the cluster state gets published, since we check the master + Thread.sleep(200); clusterState1 = clusterService1.state(); routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId()); @@ -204,8 +279,20 @@ public class IndexLifecycleActionTests extends AbstractServersTests { logger.info("Closing server1"); // kill the first server closeServer("server1"); + // wait a bit so it will be discovered as removed + Thread.sleep(200); - Thread.sleep(2000); + logger.info("Running Cluster Health"); + clusterHealth = client("server3").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.relocatingShards(), equalTo(0)); + assertThat(clusterHealth.activeShards(), equalTo(11)); + assertThat(clusterHealth.activePrimaryShards(), equalTo(11)); + + // sleep till the cluster state gets published, since we check the master + Thread.sleep(200); clusterState2 = clusterService2.state(); routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); @@ -219,9 +306,9 @@ public class IndexLifecycleActionTests extends AbstractServersTests { logger.info("Deleting index [test]"); // last, lets delete the index - client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet(); + assertThat(deleteIndexResponse.acknowledged(), equalTo(true)); - Thread.sleep(2000); clusterState2 = clusterService2.state(); routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId()); assertThat(routingNodeEntry2, nullValue());