Gateway: Add `recover_after_data_nodes` and `recover_after_master_nodes`, closes #376.

This commit is contained in:
kimchy 2010-09-19 22:46:13 +02:00
parent 95bb02b330
commit 25246902cc
2 changed files with 26 additions and 4 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.node;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -115,6 +116,10 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
return masterNodes();
}
public ImmutableMap<String, DiscoveryNode> masterAndDataNodes() {
return MapBuilder.<String, DiscoveryNode>newMapBuilder().putAll(dataNodes).putAll(masterNodes).immutableMap();
}
public DiscoveryNode get(String nodeId) {
return nodes.get(nodeId);
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -59,6 +60,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final TimeValue initialStateTimeout;
private final TimeValue recoverAfterTime;
private final int recoverAfterNodes;
private final int recoverAfterDataNodes;
private final int recoverAfterMasterNodes;
private final AtomicBoolean performedStateRecovery = new AtomicBoolean();
@ -73,6 +76,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// allow to control a delay of when indices will get created
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null);
this.recoverAfterNodes = componentSettings.getAsInt("recover_after_nodes", -1);
this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1);
this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", -1);
}
@Override protected void doStart() throws ElasticSearchException {
@ -81,10 +86,17 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// node from starting until we recovered properly
if (discoveryService.initialStateReceived()) {
ClusterState clusterState = clusterService.state();
DiscoveryNodes nodes = clusterState.nodes();
if (clusterState.nodes().localNodeMaster() && !clusterState.metaData().recoveredFromGateway()) {
if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) {
if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
updateClusterStateBlockedOnNotRecovered();
logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
updateClusterStateBlockedOnNotRecovered();
logger.debug("not recovering from gateway, nodes_size (data) [" + nodes.dataNodes().size() + "] < recover_after_data_nodes [" + recoverAfterDataNodes + "]");
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
updateClusterStateBlockedOnNotRecovered();
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else if (recoverAfterTime != null) {
updateClusterStateBlockedOnNotRecovered();
logger.debug("not recovering from gateway, recover_after_time [{}]", recoverAfterTime);
@ -116,8 +128,13 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (event.localNodeMaster()) {
if (!event.state().metaData().recoveredFromGateway()) {
ClusterState clusterState = event.state();
if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) {
logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
DiscoveryNodes nodes = clusterState.nodes();
if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
logger.debug("not recovering from gateway, nodes_size (data) [" + nodes.dataNodes().size() + "] < recover_after_data_nodes [" + recoverAfterDataNodes + "]");
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else {
if (performedStateRecovery.compareAndSet(false, true)) {
threadPool.cached().execute(new Runnable() {