start work on cluster health api, still needs some polising, relates to #42.

This commit is contained in:
kimchy 2010-02-24 23:16:01 +02:00
parent e3af8716ed
commit da510f28ab
21 changed files with 1290 additions and 109 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action;
import com.google.inject.AbstractModule;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction;
import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexReplicationPingAction;
@ -58,6 +59,7 @@ public class TransportActionModule extends AbstractModule {
bind(TransportNodesInfo.class).asEagerSingleton();
bind(TransportClusterStateAction.class).asEagerSingleton();
bind(TransportClusterHealthAction.class).asEagerSingleton();
bind(TransportSinglePingAction.class).asEagerSingleton();
bind(TransportBroadcastPingAction.class).asEagerSingleton();

View File

@ -62,6 +62,7 @@ public class TransportActions {
public static class Cluster {
public static final String STATE = "/cluster/state";
public static final String HEALTH = "/cluster/health";
public static class Node {
public static final String INFO = "/cluster/nodes/info";

View File

@ -0,0 +1,134 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.TimeValue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public class ClusterHealthRequest extends MasterNodeOperationRequest {
private String[] indices;
private TimeValue timeout = new TimeValue(30, TimeUnit.SECONDS);
private ClusterHealthStatus waitForStatus;
private int waitForRelocatingShards = -1;
ClusterHealthRequest() {
}
public ClusterHealthRequest(String... indices) {
this.indices = indices;
}
public String[] indices() {
return indices;
}
public TimeValue timeout() {
return timeout;
}
public ClusterHealthRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
public ClusterHealthStatus waitForStatus() {
return waitForStatus;
}
public ClusterHealthRequest waitForStatus(ClusterHealthStatus waitForStatus) {
this.waitForStatus = waitForStatus;
return this;
}
public ClusterHealthRequest waitForGreenStatus() {
return waitForStatus(ClusterHealthStatus.GREEN);
}
public ClusterHealthRequest waitForYellowStatus() {
return waitForStatus(ClusterHealthStatus.YELLOW);
}
public int waitForRelocatingShards() {
return waitForRelocatingShards;
}
public ClusterHealthRequest waitForRelocatingShards(int waitForRelocatingShards) {
this.waitForRelocatingShards = waitForRelocatingShards;
return this;
}
@Override public ActionRequestValidationException validate() {
return null;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
super.readFrom(in);
int size = in.readInt();
if (size == 0) {
indices = Strings.EMPTY_ARRAY;
} else {
indices = new String[size];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readUTF();
}
}
timeout = readTimeValue(in);
if (in.readBoolean()) {
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
}
waitForRelocatingShards = in.readInt();
}
@Override public void writeTo(DataOutput out) throws IOException {
super.writeTo(out);
if (indices == null) {
out.writeInt(0);
} else {
out.writeInt(indices.length);
for (String index : indices) {
out.writeUTF(index);
}
}
timeout.writeTo(out);
if (waitForStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(waitForStatus.value());
}
out.writeInt(waitForRelocatingShards);
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import com.google.common.collect.Maps;
import org.elasticsearch.action.ActionResponse;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth.*;
/**
* @author kimchy (shay.banon)
*/
public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIndexHealth> {
private String clusterName;
int activeShards = 0;
int relocatingShards = 0;
int activePrimaryShards = 0;
boolean timedOut = false;
ClusterHealthStatus status = ClusterHealthStatus.RED;
Map<String, ClusterIndexHealth> indices = Maps.newHashMap();
ClusterHealthResponse() {
}
public ClusterHealthResponse(String clusterName) {
this.clusterName = clusterName;
}
public String clusterName() {
return clusterName;
}
public int activeShards() {
return activeShards;
}
public int relocatingShards() {
return relocatingShards;
}
public int activePrimaryShards() {
return activePrimaryShards;
}
/**
* <tt>true</tt> if the waitForXXX has timeout out and did not match.
*/
public boolean timedOut() {
return this.timedOut;
}
public ClusterHealthStatus status() {
return status;
}
public Map<String, ClusterIndexHealth> indices() {
return indices;
}
@Override public Iterator<ClusterIndexHealth> iterator() {
return indices.values().iterator();
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
clusterName = in.readUTF();
activePrimaryShards = in.readInt();
activeShards = in.readInt();
relocatingShards = in.readInt();
status = ClusterHealthStatus.fromValue(in.readByte());
int size = in.readInt();
for (int i = 0; i < size; i++) {
ClusterIndexHealth indexHealth = readClusterIndexHealth(in);
indices.put(indexHealth.index(), indexHealth);
}
timedOut = in.readBoolean();
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeUTF(clusterName);
out.writeInt(activePrimaryShards);
out.writeInt(activeShards);
out.writeInt(relocatingShards);
out.writeByte(status.value());
out.writeInt(indices.size());
for (ClusterIndexHealth indexHealth : this) {
indexHealth.writeTo(out);
}
out.writeBoolean(timedOut);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
/**
* @author kimchy (shay.banon)
*/
public enum ClusterHealthStatus {
GREEN((byte) 0),
YELLOW((byte) 1),
RED((byte) 2);
private byte value;
ClusterHealthStatus(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static ClusterHealthStatus fromValue(byte value) {
switch (value) {
case 0:
return GREEN;
case 1:
return YELLOW;
case 2:
return RED;
default:
throw new ElasticSearchIllegalArgumentException("No cluster health status for value [" + value + "]");
}
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import com.google.common.collect.Maps;
import org.elasticsearch.util.io.Streamable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.action.admin.cluster.health.ClusterShardHealth.*;
/**
* @author kimchy (shay.banon)
*/
public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streamable {
private String index;
private int numberOfShards;
private int numberOfReplicas;
int activeShards = 0;
int relocatingShards = 0;
int activePrimaryShards = 0;
ClusterHealthStatus status = ClusterHealthStatus.RED;
final Map<Integer, ClusterShardHealth> shards = Maps.newHashMap();
private ClusterIndexHealth() {
}
public ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas) {
this.index = index;
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
}
public String index() {
return index;
}
public int numberOfShards() {
return numberOfShards;
}
public int numberOfReplicas() {
return numberOfReplicas;
}
public int activeShards() {
return activeShards;
}
public int relocatingShards() {
return relocatingShards;
}
public int activePrimaryShards() {
return activePrimaryShards;
}
public ClusterHealthStatus status() {
return status;
}
public Map<Integer, ClusterShardHealth> shards() {
return this.shards;
}
@Override public Iterator<ClusterShardHealth> iterator() {
return shards.values().iterator();
}
public static ClusterIndexHealth readClusterIndexHealth(DataInput in) throws IOException, ClassNotFoundException {
ClusterIndexHealth indexHealth = new ClusterIndexHealth();
indexHealth.readFrom(in);
return indexHealth;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
index = in.readUTF();
numberOfShards = in.readInt();
numberOfReplicas = in.readInt();
activePrimaryShards = in.readInt();
activeShards = in.readInt();
relocatingShards = in.readInt();
status = ClusterHealthStatus.fromValue(in.readByte());
int size = in.readInt();
for (int i = 0; i < size; i++) {
ClusterShardHealth shardHealth = readClusterShardHealth(in);
shards.put(shardHealth.id(), shardHealth);
}
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeUTF(index);
out.writeInt(numberOfShards);
out.writeInt(numberOfReplicas);
out.writeInt(activePrimaryShards);
out.writeInt(activeShards);
out.writeInt(relocatingShards);
out.writeByte(status.value());
out.writeInt(shards.size());
for (ClusterShardHealth shardHealth : this) {
shardHealth.writeTo(out);
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.util.io.Streamable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class ClusterShardHealth implements Streamable {
private int shardId;
ClusterHealthStatus status = ClusterHealthStatus.RED;
int activeShards = 0;
int relocatingShards = 0;
boolean primaryActive = false;
private ClusterShardHealth() {
}
ClusterShardHealth(int shardId) {
this.shardId = shardId;
}
public int id() {
return shardId;
}
public ClusterHealthStatus status() {
return status;
}
public int relocatingShards() {
return relocatingShards;
}
public int activeShards() {
return activeShards;
}
public boolean primaryActive() {
return primaryActive;
}
static ClusterShardHealth readClusterShardHealth(DataInput in) throws IOException, ClassNotFoundException {
ClusterShardHealth ret = new ClusterShardHealth();
ret.readFrom(in);
return ret;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
shardId = in.readInt();
status = ClusterHealthStatus.fromValue(in.readByte());
activeShards = in.readInt();
relocatingShards = in.readInt();
primaryActive = in.readBoolean();
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeInt(shardId);
out.writeByte(status.value());
out.writeInt(activeShards);
out.writeInt(relocatingShards);
out.writeBoolean(primaryActive);
}
}

View File

@ -0,0 +1,187 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.health;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy (shay.banon)
*/
public class TransportClusterHealthAction extends TransportMasterNodeOperationAction<ClusterHealthRequest, ClusterHealthResponse> {
private final ClusterName clusterName;
private final TimerService timerService;
@Inject public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TimerService timerService, ClusterName clusterName) {
super(settings, transportService, clusterService, threadPool);
this.clusterName = clusterName;
this.timerService = timerService;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.HEALTH;
}
@Override protected ClusterHealthRequest newRequest() {
return new ClusterHealthRequest();
}
@Override protected ClusterHealthResponse newResponse() {
return new ClusterHealthResponse();
}
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request) throws ElasticSearchException {
int waitFor = 2;
if (request.waitForStatus() == null) {
waitFor--;
}
if (request.waitForRelocatingShards() == -1) {
waitFor--;
}
if (waitFor == 0) {
// no need to wait for anything
return clusterHealth(request);
}
long endTime = System.currentTimeMillis() + request.timeout().millis();
while (true) {
int waitForCounter = 0;
ClusterHealthResponse response = clusterHealth(request);
if (request.waitForStatus() != null && response.status() == request.waitForStatus()) {
waitForCounter++;
}
if (request.waitForRelocatingShards() != -1 && response.relocatingShards() <= request.waitForRelocatingShards()) {
waitForCounter++;
}
if (waitForCounter == waitFor) {
return response;
}
if (timerService.estimatedTimeInMillis() > endTime) {
response.timedOut = true;
return response;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
response.timedOut = true;
// we got interrupted, bail
return response;
}
}
}
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request) {
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value());
ClusterState clusterState = clusterService.state();
String[] indices = processIndices(clusterState, request.indices());
for (String index : indices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexRoutingTable == null) {
continue;
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(index, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas());
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
ClusterShardHealth shardHealth = new ClusterShardHealth(shardRoutingTable.shardId().id());
for (ShardRouting shardRouting : shardRoutingTable) {
if (shardRouting.active()) {
shardHealth.activeShards++;
if (shardRouting.relocating()) {
// the shard is relocating, the one he is relocating to will be in initializing state, so we don't count it
shardHealth.relocatingShards++;
}
if (shardRouting.primary()) {
shardHealth.primaryActive = true;
}
}
}
if (shardHealth.primaryActive) {
if (shardHealth.activeShards == shardRoutingTable.size()) {
shardHealth.status = ClusterHealthStatus.GREEN;
} else {
shardHealth.status = ClusterHealthStatus.YELLOW;
}
} else {
shardHealth.status = ClusterHealthStatus.RED;
}
indexHealth.shards.put(shardHealth.id(), shardHealth);
}
for (ClusterShardHealth shardHealth : indexHealth) {
if (shardHealth.primaryActive()) {
indexHealth.activePrimaryShards++;
}
indexHealth.activeShards += shardHealth.activeShards;
indexHealth.relocatingShards += shardHealth.relocatingShards;
}
// update the index status
indexHealth.status = ClusterHealthStatus.GREEN;
for (ClusterShardHealth shardHealth : indexHealth) {
if (shardHealth.status() == ClusterHealthStatus.RED) {
indexHealth.status = ClusterHealthStatus.RED;
break;
}
if (shardHealth.status() == ClusterHealthStatus.YELLOW) {
indexHealth.status = ClusterHealthStatus.YELLOW;
}
}
response.indices.put(indexHealth.index(), indexHealth);
}
for (ClusterIndexHealth indexHealth : response) {
response.activePrimaryShards += indexHealth.activePrimaryShards;
response.activeShards += indexHealth.activeShards;
response.relocatingShards += indexHealth.relocatingShards;
}
response.status = ClusterHealthStatus.GREEN;
for (ClusterIndexHealth indexHealth : response) {
if (indexHealth.status() == ClusterHealthStatus.RED) {
response.status = ClusterHealthStatus.RED;
break;
}
if (indexHealth.status() == ClusterHealthStatus.YELLOW) {
response.status = ClusterHealthStatus.YELLOW;
}
}
return response;
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.broadcast;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.Strings;
import java.io.DataInput;
import java.io.DataOutput;
@ -78,9 +79,13 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeInt(indices.length);
for (String index : indices) {
out.writeUTF(index);
if (indices == null) {
out.writeInt(0);
} else {
out.writeInt(indices.length);
for (String index : indices) {
out.writeUTF(index);
}
}
if (queryHint == null) {
out.writeBoolean(false);
@ -92,9 +97,14 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
indices = new String[in.readInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readUTF();
int size = in.readInt();
if (size == 0) {
indices = Strings.EMPTY_ARRAY;
} else {
indices = new String[size];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readUTF();
}
}
if (in.readBoolean()) {
queryHint = in.readUTF();

View File

@ -21,6 +21,8 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
@ -40,6 +42,34 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
*/
public interface ClusterAdminClient {
/**
* The health of the cluster.
*
* @param request The cluster state request
* @return The result future
* @see Requests#clusterHealth(String...)
*/
ActionFuture<ClusterHealthResponse> health(ClusterHealthRequest request);
/**
* The health of the cluster.
*
* @param request The cluster state request
* @param listener A listener to be notified with a result
* @return The result future
* @see Requests#clusterHealth(String...)
*/
ActionFuture<ClusterHealthResponse> health(ClusterHealthRequest request, ActionListener<ClusterHealthResponse> listener);
/**
* The health of the cluster.
*
* @param request The cluster state request
* @param listener A listener to be notified with a result
* @see Requests#clusterHealth(String...)
*/
void execHealth(ClusterHealthRequest request, ActionListener<ClusterHealthResponse> listener);
/**
* The state of the cluster.
*

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest;
@ -233,6 +234,17 @@ public class Requests {
return new ClusterStateRequest();
}
/**
* Creates a cluster health request.
*
* @param indices The indices to optimize. Use <tt>null</tt> or <tt>_all</tt> to execute against all indices
* @return The cluster health request
* @see org.elasticsearch.client.ClusterAdminClient#health(org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest)
*/
public static ClusterHealthRequest clusterHealth(String... indices) {
return new ClusterHealthRequest(indices);
}
/**
* Creates a nodes info request against all the nodes.
*

View File

@ -22,6 +22,9 @@ package org.elasticsearch.client.server;
import com.google.inject.Inject;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
@ -46,6 +49,8 @@ import org.elasticsearch.util.settings.Settings;
*/
public class ServerClusterAdminClient extends AbstractComponent implements ClusterAdminClient {
private final TransportClusterHealthAction clusterHealthAction;
private final TransportClusterStateAction clusterStateAction;
private final TransportSinglePingAction singlePingAction;
@ -57,10 +62,11 @@ public class ServerClusterAdminClient extends AbstractComponent implements Clust
private final TransportNodesInfo nodesInfo;
@Inject public ServerClusterAdminClient(Settings settings,
TransportClusterStateAction clusterStateAction,
TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction,
TransportSinglePingAction singlePingAction, TransportBroadcastPingAction broadcastPingAction, TransportReplicationPingAction replicationPingAction,
TransportNodesInfo nodesInfo) {
super(settings);
this.clusterHealthAction = clusterHealthAction;
this.clusterStateAction = clusterStateAction;
this.nodesInfo = nodesInfo;
this.singlePingAction = singlePingAction;
@ -68,6 +74,18 @@ public class ServerClusterAdminClient extends AbstractComponent implements Clust
this.replicationPingAction = replicationPingAction;
}
@Override public ActionFuture<ClusterHealthResponse> health(ClusterHealthRequest request) {
return clusterHealthAction.submit(request);
}
@Override public ActionFuture<ClusterHealthResponse> health(ClusterHealthRequest request, ActionListener<ClusterHealthResponse> listener) {
return clusterHealthAction.submit(request, listener);
}
@Override public void execHealth(ClusterHealthRequest request, ActionListener<ClusterHealthResponse> listener) {
clusterHealthAction.execute(request, listener);
}
@Override public ActionFuture<ClusterStateResponse> state(ClusterStateRequest request) {
return clusterStateAction.submit(request);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.transport.action;
import com.google.inject.AbstractModule;
import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
@ -43,7 +44,7 @@ import org.elasticsearch.client.transport.action.search.ClientTransportSearchScr
import org.elasticsearch.client.transport.action.terms.ClientTransportTermsAction;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ClientTransportActionModule extends AbstractModule {
@ -70,5 +71,6 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportReplicationPingAction.class).asEagerSingleton();
bind(ClientTransportBroadcastPingAction.class).asEagerSingleton();
bind(ClientTransportClusterStateAction.class).asEagerSingleton();
bind(ClientTransportClusterHealthAction.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.admin.cluster.health;
import com.google.inject.Inject;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (Shay Banon)
*/
public class ClientTransportClusterHealthAction extends BaseClientTransportAction<ClusterHealthRequest, ClusterHealthResponse> {
@Inject public ClientTransportClusterHealthAction(Settings settings, TransportService transportService) {
super(settings, transportService, ClusterHealthResponse.class);
}
@Override protected String action() {
return TransportActions.Admin.Cluster.HEALTH;
}
}

View File

@ -23,6 +23,8 @@ import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
@ -35,6 +37,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
@ -51,6 +54,8 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
private final TransportClientNodesService nodesService;
private final ClientTransportClusterHealthAction clusterHealthAction;
private final ClientTransportClusterStateAction clusterStateAction;
private final ClientTransportSinglePingAction singlePingAction;
@ -62,11 +67,12 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
private final ClientTransportNodesInfoAction nodesInfoAction;
@Inject public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService,
ClientTransportClusterStateAction clusterStateAction,
ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction,
ClientTransportSinglePingAction singlePingAction, ClientTransportReplicationPingAction replicationPingAction, ClientTransportBroadcastPingAction broadcastPingAction,
ClientTransportNodesInfoAction nodesInfoAction) {
super(settings);
this.nodesService = nodesService;
this.clusterHealthAction = clusterHealthAction;
this.clusterStateAction = clusterStateAction;
this.nodesInfoAction = nodesInfoAction;
this.singlePingAction = singlePingAction;
@ -74,6 +80,31 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
this.broadcastPingAction = broadcastPingAction;
}
@Override public ActionFuture<ClusterHealthResponse> health(final ClusterHealthRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ClusterHealthResponse>>() {
@Override public ActionFuture<ClusterHealthResponse> doWithNode(Node node) throws ElasticSearchException {
return clusterHealthAction.submit(node, request);
}
});
}
@Override public ActionFuture<ClusterHealthResponse> health(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ClusterHealthResponse>>() {
@Override public ActionFuture<ClusterHealthResponse> doWithNode(Node node) throws ElasticSearchException {
return clusterHealthAction.submit(node, request, listener);
}
});
}
@Override public void execHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
clusterHealthAction.execute(node, request, listener);
return null;
}
});
}
@Override public ActionFuture<ClusterStateResponse> state(final ClusterStateRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ClusterStateResponse>>() {
@Override public ActionFuture<ClusterStateResponse> doWithNode(Node node) throws ElasticSearchException {

View File

@ -357,7 +357,6 @@ public class DefaultShardsRoutingStrategy implements ShardsRoutingStrategy {
shard.deassignNode();
shards.remove();
} else {
assert shard.state() == ShardRoutingState.RELOCATING;
shard.cancelRelocation();
}
break;

View File

@ -145,18 +145,24 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent {
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
return;
}
indexShard.snapshot(new Engine.SnapshotHandler() {
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) {
try {
indexShard.snapshot(new Engine.SnapshotHandler() {
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) {
shardGateway.snapshot(snapshotIndexCommit, translogSnapshot);
shardGateway.snapshot(snapshotIndexCommit, translogSnapshot);
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogSize = translogSnapshot.size();
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogSize = translogSnapshot.size();
}
}
}
});
});
} catch (IllegalIndexShardStateException e) {
// ignore, that's fine
} catch (Exception e) {
logger.warn("Failed to snapshot on close", e);
}
}
public void close() {

View File

@ -48,7 +48,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.*;
@ -77,6 +79,16 @@ public class RecoveryAction extends AbstractIndexShardComponent {
private final String snapshotTransportAction;
private volatile boolean closed = false;
private volatile Thread sendStartRecoveryThread;
private volatile Thread receiveSnapshotRecoveryThread;
private volatile Thread sendSnapshotRecoveryThread;
private final CopyOnWriteArrayList<Future> sendFileChunksRecoveryFutures = new CopyOnWriteArrayList<Future>();
@Inject public RecoveryAction(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, TransportService transportService, IndexShard indexShard, Store store) {
super(shardId, indexSettings);
this.threadPool = threadPool;
@ -96,64 +108,97 @@ public class RecoveryAction extends AbstractIndexShardComponent {
}
public void close() {
closed = true;
transportService.removeHandler(startTransportAction);
transportService.removeHandler(fileChunkTransportAction);
transportService.removeHandler(snapshotTransportAction);
cleanOpenIndex();
if (true) {
// disable the interruptions for now
return;
}
// interrupt the startRecovery thread if its performing recovery
if (sendStartRecoveryThread != null) {
sendStartRecoveryThread.interrupt();
}
if (receiveSnapshotRecoveryThread != null) {
receiveSnapshotRecoveryThread.interrupt();
}
if (sendSnapshotRecoveryThread != null) {
sendSnapshotRecoveryThread.interrupt();
}
for (Future future : sendFileChunksRecoveryFutures) {
future.cancel(true);
}
}
public synchronized void startRecovery(Node node, Node targetNode, boolean markAsRelocated) throws ElasticSearchException {
// mark the shard as recovering
IndexShardState preRecoveringState;
sendStartRecoveryThread = Thread.currentThread();
try {
preRecoveringState = indexShard.recovering();
} catch (IndexShardRecoveringException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardStartedException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already started
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardRelocatedException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already relocated
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardClosedException e) {
throw new IgnoreRecoveryException("can't recover a closed shard.", e);
}
logger.debug("Starting recovery from {}", targetNode);
StopWatch stopWatch = new StopWatch().start();
try {
RecoveryStatus recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, new StartRecoveryRequest(node, markAsRelocated), new FutureTransportResponseHandler<RecoveryStatus>() {
@Override public RecoveryStatus newInstance() {
return new RecoveryStatus();
// mark the shard as recovering
IndexShardState preRecoveringState;
try {
preRecoveringState = indexShard.recovering();
} catch (IndexShardRecoveringException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardStartedException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already started
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardRelocatedException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already relocated
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardClosedException e) {
throw new IgnoreRecoveryException("Can't recover a closed shard.", e);
}
logger.debug("Starting recovery from {}", targetNode);
StopWatch stopWatch = new StopWatch().start();
try {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed");
}
}).txGet();
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(targetNode).append(", took [").append(stopWatch.totalTime()).append("]\n");
sb.append(" Phase1: recovered [").append(recoveryStatus.phase1FileNames.size()).append("]")
.append(" files with total size of [").append(new SizeValue(recoveryStatus.phase1TotalSize)).append("]")
.append(", took [").append(new TimeValue(recoveryStatus.phase1Time, MILLISECONDS)).append("]")
.append("\n");
sb.append(" Phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(new TimeValue(recoveryStatus.phase2Time, MILLISECONDS)).append("]")
.append("\n");
sb.append(" Phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(new TimeValue(recoveryStatus.phase3Time, MILLISECONDS)).append("]");
logger.debug(sb.toString());
RecoveryStatus recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, new StartRecoveryRequest(node, markAsRelocated), new FutureTransportResponseHandler<RecoveryStatus>() {
@Override public RecoveryStatus newInstance() {
return new RecoveryStatus();
}
}).txGet();
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(targetNode).append(", took [").append(stopWatch.totalTime()).append("]\n");
sb.append(" Phase1: recovered [").append(recoveryStatus.phase1FileNames.size()).append("]")
.append(" files with total size of [").append(new SizeValue(recoveryStatus.phase1TotalSize)).append("]")
.append(", took [").append(new TimeValue(recoveryStatus.phase1Time, MILLISECONDS)).append("]")
.append("\n");
sb.append(" Phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(new TimeValue(recoveryStatus.phase2Time, MILLISECONDS)).append("]")
.append("\n");
sb.append(" Phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(new TimeValue(recoveryStatus.phase3Time, MILLISECONDS)).append("]");
logger.debug(sb.toString());
}
} catch (RemoteTransportException e) {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed", e);
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ActionNotFoundTransportException || cause instanceof IndexShardNotStartedException) {
// the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering
indexShard.restoreRecoveryState(preRecoveringState);
throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e);
}
throw new RecoveryFailedException(shardId, node, targetNode, e);
} catch (Exception e) {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed", e);
}
throw new RecoveryFailedException(shardId, node, targetNode, e);
}
} catch (RemoteTransportException e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ActionNotFoundTransportException ||
cause instanceof IndexShardNotStartedException) {
// the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering
indexShard.restoreRecoveryState(preRecoveringState);
throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e);
}
throw new RecoveryFailedException(shardId, node, targetNode, e);
} catch (Exception e) {
throw new RecoveryFailedException(shardId, node, targetNode, e);
} finally {
sendStartRecoveryThread = null;
}
}
@ -226,7 +271,7 @@ public class RecoveryAction extends AbstractIndexShardComponent {
final CountDownLatch latch = new CountDownLatch(snapshot.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : snapshot.getFiles()) {
threadPool.execute(new Runnable() {
sendFileChunksRecoveryFutures.add(threadPool.submit(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
@ -256,7 +301,7 @@ public class RecoveryAction extends AbstractIndexShardComponent {
latch.countDown();
}
}
});
}));
}
latch.await();
@ -270,31 +315,49 @@ public class RecoveryAction extends AbstractIndexShardComponent {
recoveryStatus.phase1Time = stopWatch.totalTime().millis();
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new SizeValue(totalSize), e);
} finally {
sendFileChunksRecoveryFutures.clear();
}
}
@Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
logger.trace("Recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, false);
stopWatch.stop();
logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = snapshot.size();
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, false);
stopWatch.stop();
logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = snapshot.size();
} finally {
sendSnapshotRecoveryThread = null;
}
}
@Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
logger.trace("Recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, true);
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
indexShard.relocated();
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, true);
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
indexShard.relocated();
}
stopWatch.stop();
logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = snapshot.size();
} finally {
sendSnapshotRecoveryThread = null;
}
stopWatch.stop();
logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = snapshot.size();
}
private void sendSnapshot(Translog.Snapshot snapshot, boolean phase3) throws ElasticSearchException {
@ -371,16 +434,24 @@ public class RecoveryAction extends AbstractIndexShardComponent {
}
@Override public void messageReceived(SnapshotWrapper snapshot, TransportChannel channel) throws Exception {
if (!snapshot.phase3) {
// clean open index outputs in any case (there should not be any open, we close then in the chunk)
cleanOpenIndex();
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
if (!snapshot.phase3) {
// clean open index outputs in any case (there should not be any open, we close then in the chunk)
cleanOpenIndex();
}
indexShard.performRecovery(snapshot.snapshot, snapshot.phase3);
if (snapshot.phase3) {
indexShard.refresh(new Engine.Refresh(true));
// probably need to do more here...
}
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
indexShard.performRecovery(snapshot.snapshot, snapshot.phase3);
if (snapshot.phase3) {
indexShard.refresh(new Engine.Refresh(true));
// probably need to do more here...
}
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@ -417,6 +488,9 @@ public class RecoveryAction extends AbstractIndexShardComponent {
}
@Override public void messageReceived(FileChunk request, TransportChannel channel) throws Exception {
if (closed) {
throw new IndexShardClosedException(shardId);
}
IndexOutput indexOutput;
if (request.position == 0) {
// first request

View File

@ -20,6 +20,7 @@
package org.elasticsearch.rest.action;
import com.google.inject.AbstractModule;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.ping.broadcast.RestBroadcastPingAction;
import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicationPingAction;
@ -52,6 +53,7 @@ public class RestActionModule extends AbstractModule {
bind(RestNodesInfoAction.class).asEagerSingleton();
bind(RestClusterStateAction.class).asEagerSingleton();
bind(RestClusterHealthAction.class).asEagerSingleton();
bind(RestSinglePingAction.class).asEagerSingleton();
bind(RestBroadcastPingAction.class).asEagerSingleton();

View File

@ -0,0 +1,142 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.health;
import com.google.inject.Inject;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.*;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
/**
* @author kimchy (shay.banon)
*/
public class RestClusterHealthAction extends BaseRestHandler {
@Inject public RestClusterHealthAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/health", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/health/{index}", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
ClusterHealthRequest clusterHealthRequest = clusterHealth(RestActions.splitIndices(request.param("index")));
int level = 0;
try {
clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout()));
String waitForStatus = request.param("waitForStatus");
if (waitForStatus != null) {
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase()));
}
clusterHealthRequest.waitForRelocatingShards(request.paramAsInt("waitForRelocatingShards", clusterHealthRequest.waitForRelocatingShards()));
String sLevel = request.param("level");
if (sLevel != null) {
if ("cluster".equals("sLevel")) {
level = 0;
} else if ("indices".equals(sLevel)) {
level = 1;
} else if ("shards".equals(sLevel)) {
level = 2;
}
}
} catch (Exception e) {
try {
JsonBuilder builder = RestJsonBuilder.restJsonBuilder(request);
channel.sendResponse(new JsonRestResponse(request, PRECONDITION_FAILED, builder.startObject().field("error", e.getMessage()).endObject()));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
return;
}
final int fLevel = level;
client.admin().cluster().execHealth(clusterHealthRequest, new ActionListener<ClusterHealthResponse>() {
@Override public void onResponse(ClusterHealthResponse response) {
try {
JsonBuilder builder = RestJsonBuilder.restJsonBuilder(request);
builder.startObject();
builder.field("status", response.status().name().toLowerCase());
builder.field("timedOut", response.timedOut());
builder.field("activePrimaryShards", response.activePrimaryShards());
builder.field("activeShards", response.activeShards());
builder.field("relocatingShards", response.relocatingShards());
if (fLevel > 0) {
builder.startObject("indices");
for (ClusterIndexHealth indexHealth : response) {
builder.startObject(indexHealth.index());
builder.field("status", indexHealth.status().name().toLowerCase());
builder.field("numberOfShards", indexHealth.numberOfShards());
builder.field("numberOfReplicas", indexHealth.numberOfReplicas());
builder.field("activePrimaryShards", indexHealth.activePrimaryShards());
builder.field("activeShards", indexHealth.activeShards());
builder.field("relocatingShards", indexHealth.relocatingShards());
if (fLevel > 1) {
builder.startObject("shards");
for (ClusterShardHealth shardHealth : indexHealth) {
builder.startObject(Integer.toString(shardHealth.id()));
builder.field("status", shardHealth.status().name().toLowerCase());
builder.field("primaryActive", shardHealth.primaryActive());
builder.field("activeShards", shardHealth.activeShards());
builder.field("relocatingShards", shardHealth.relocatingShards());
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, RestResponse.Status.OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(new JsonThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -19,6 +19,10 @@
package org.elasticsearch.test.integration.indexlifecycle;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.RoutingNode;
@ -61,9 +65,18 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
ClusterService clusterService1 = ((InternalServer) server("server1")).injector().getInstance(ClusterService.class);
logger.info("Creating index [test]");
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
assertThat(createIndexResponse.acknowledged(), equalTo(true));
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
// sleep till the cluster state gets published, since we check the master
Thread.sleep(200);
Thread.sleep(1000);
ClusterState clusterState1 = clusterService1.state();
RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
@ -79,7 +92,14 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
ClusterService clusterService2 = ((InternalServer) server("server2")).injector().getInstance(ClusterService.class);
Thread.sleep(1500);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
// sleep till the cluster state gets published, since we check the master
Thread.sleep(200);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
@ -95,7 +115,17 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
ClusterService clusterService3 = ((InternalServer) server("server3")).injector().getInstance(ClusterService.class);
Thread.sleep(1500);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.relocatingShards(), equalTo(0));
assertThat(clusterHealth.activeShards(), equalTo(22));
assertThat(clusterHealth.activePrimaryShards(), equalTo(11));
// sleep till the cluster state gets published, since we check the master
Thread.sleep(200);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
@ -114,8 +144,20 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
logger.info("Closing server1");
// kill the first server
closeServer("server1");
// wait a bit so it will be discovered as removed
Thread.sleep(200);
// verify health
logger.info("Running Cluster Health");
clusterHealth = client("server2").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.relocatingShards(), equalTo(0));
assertThat(clusterHealth.activeShards(), equalTo(22));
assertThat(clusterHealth.activePrimaryShards(), equalTo(11));
Thread.sleep(1500);
// sleep till the cluster state gets published, since we check the master
Thread.sleep(200);
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
@ -129,9 +171,10 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
logger.info("Deleting index [test]");
// last, lets delete the index
client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet();
DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet();
assertThat(deleteIndexResponse.acknowledged(), equalTo(true));
Thread.sleep(1500);
Thread.sleep(200);
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2, nullValue());
@ -155,9 +198,17 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
ClusterService clusterService1 = ((InternalServer) server("server1")).injector().getInstance(ClusterService.class);
logger.info("Creating index [test]");
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
CreateIndexResponse createIndexResponse = client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
assertThat(createIndexResponse.acknowledged(), equalTo(true));
Thread.sleep(1000);
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.relocatingShards(), equalTo(0));
assertThat(clusterHealth.activeShards(), equalTo(11));
assertThat(clusterHealth.activePrimaryShards(), equalTo(11));
ClusterState clusterState1 = clusterService1.state();
RoutingNode routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
@ -166,11 +217,23 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
// start another server
logger.info("Starting server2");
startServer("server2", settings);
// wait a bit
Thread.sleep(200);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.relocatingShards(), equalTo(0));
assertThat(clusterHealth.activeShards(), equalTo(11));
assertThat(clusterHealth.activePrimaryShards(), equalTo(11));
// sleep till the cluster state gets published, since we check the master
Thread.sleep(200);
ClusterService clusterService2 = ((InternalServer) server("server2")).injector().getInstance(ClusterService.class);
Thread.sleep(2000);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(6), equalTo(5)));
@ -182,10 +245,22 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
// start another server
logger.info("Starting server3");
startServer("server3");
// wait a bit so assignment will start
Thread.sleep(200);
ClusterService clusterService3 = ((InternalServer) server("server3")).injector().getInstance(ClusterService.class);
Thread.sleep(1500);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.relocatingShards(), equalTo(0));
assertThat(clusterHealth.activeShards(), equalTo(11));
assertThat(clusterHealth.activePrimaryShards(), equalTo(11));
// sleep till the cluster state gets published, since we check the master
Thread.sleep(200);
clusterState1 = clusterService1.state();
routingNodeEntry1 = clusterState1.routingNodes().nodesToShards().get(clusterState1.nodes().localNodeId());
@ -204,8 +279,20 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
logger.info("Closing server1");
// kill the first server
closeServer("server1");
// wait a bit so it will be discovered as removed
Thread.sleep(200);
Thread.sleep(2000);
logger.info("Running Cluster Health");
clusterHealth = client("server3").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.relocatingShards(), equalTo(0));
assertThat(clusterHealth.activeShards(), equalTo(11));
assertThat(clusterHealth.activePrimaryShards(), equalTo(11));
// sleep till the cluster state gets published, since we check the master
Thread.sleep(200);
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
@ -219,9 +306,9 @@ public class IndexLifecycleActionTests extends AbstractServersTests {
logger.info("Deleting index [test]");
// last, lets delete the index
client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet();
DeleteIndexResponse deleteIndexResponse = client("server2").admin().indices().delete(deleteIndexRequest("test")).actionGet();
assertThat(deleteIndexResponse.acknowledged(), equalTo(true));
Thread.sleep(2000);
clusterState2 = clusterService2.state();
routingNodeEntry2 = clusterState2.routingNodes().nodesToShards().get(clusterState2.nodes().localNodeId());
assertThat(routingNodeEntry2, nullValue());