only count data_nodes with recover_after_nodes parameter

This commit is contained in:
kimchy 2010-07-15 01:12:00 +03:00
parent 61fadb4dc0
commit ca8ad83092
1 changed files with 6 additions and 5 deletions

View File

@ -96,9 +96,9 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (discoveryService.initialStateReceived()) { if (discoveryService.initialStateReceived()) {
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
if (clusterState.nodes().localNodeMaster() && !clusterState.metaData().recoveredFromGateway()) { if (clusterState.nodes().localNodeMaster() && !clusterState.metaData().recoveredFromGateway()) {
if (recoverAfterNodes != -1 && clusterState.nodes().size() < recoverAfterNodes) { if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) {
updateClusterStateBlockedOnNotRecovered(); updateClusterStateBlockedOnNotRecovered();
logger.debug("not recovering from gateway, nodes_size [" + clusterState.nodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]"); logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else { } else {
if (readFromGateway.compareAndSet(false, true)) { if (readFromGateway.compareAndSet(false, true)) {
Boolean waited = readFromGateway(initialStateTimeout); Boolean waited = readFromGateway(initialStateTimeout);
@ -136,8 +136,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (event.localNodeMaster()) { if (event.localNodeMaster()) {
if (!event.state().metaData().recoveredFromGateway()) { if (!event.state().metaData().recoveredFromGateway()) {
ClusterState clusterState = event.state(); ClusterState clusterState = event.state();
if (recoverAfterNodes != -1 && clusterState.nodes().size() < recoverAfterNodes) { if (recoverAfterNodes != -1 && clusterState.nodes().dataNodes().size() < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size [" + clusterState.nodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]"); logger.debug("not recovering from gateway, data_nodes_size [" + clusterState.nodes().dataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else { } else {
if (readFromGateway.compareAndSet(false, true)) { if (readFromGateway.compareAndSet(false, true)) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -236,7 +236,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private void updateClusterStateFromGateway(final MetaData fMetaData, final CountDownLatch latch) { private void updateClusterStateFromGateway(final MetaData fMetaData, final CountDownLatch latch) {
clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("gateway (recovered meta-data)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder().metaData(currentState.metaData()); MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData());
// mark the metadata as read from gateway // mark the metadata as read from gateway
metaDataBuilder.markAsRecoveredFromGateway(); metaDataBuilder.markAsRecoveredFromGateway();
return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build(); return newClusterStateBuilder().state(currentState).metaData(metaDataBuilder).build();