don't block node master node startup if recover_after_time is set and nodes settings are not met (we already handle it with the not recovered state)

This commit is contained in:
kimchy 2011-03-04 05:11:28 +02:00
parent 4b92928c77
commit 20ed540fe7

View File

@ -20,7 +20,6 @@
package org.elasticsearch.gateway;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -30,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -41,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -66,7 +63,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final MetaDataCreateIndexService createIndexService;
private final TimeValue initialStateTimeout;
private final TimeValue recoverAfterTime;
private final int recoverAfterNodes;
private final int expectedNodes;
@ -86,7 +82,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
this.discoveryService = discoveryService;
this.createIndexService = createIndexService;
this.threadPool = threadPool;
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
// allow to control a delay of when indices will get created
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null);
this.recoverAfterNodes = componentSettings.getAsInt("recover_after_nodes", -1);
@ -115,24 +110,24 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else {
boolean ignoreTimeout;
boolean ignoreRecoverAfterTime;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, don't ignore the timeout
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
} else {
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
ignoreTimeout = true;
ignoreRecoverAfterTime = true;
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
}
if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
}
if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
}
}
performStateRecovery(initialStateTimeout, ignoreTimeout);
performStateRecovery(ignoreRecoverAfterTime);
}
}
} else {
@ -151,7 +146,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
}
@Override public void clusterChanged(final ClusterChangedEvent event) {
if (!lifecycle.started()) {
if (lifecycle.stoppedOrClosed()) {
return;
}
if (event.localNodeMaster() && event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
@ -164,42 +159,37 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else {
boolean ignoreTimeout;
boolean ignoreRecoverAfterTime;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, don't ignore the timeout
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
} else {
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
ignoreTimeout = true;
ignoreRecoverAfterTime = true;
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
}
if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
}
if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
ignoreTimeout = false;
ignoreRecoverAfterTime = false;
}
}
final boolean fIgnoreTimeout = ignoreTimeout;
final boolean fIgnoreRecoverAfterTime = ignoreRecoverAfterTime;
threadPool.cached().execute(new Runnable() {
@Override public void run() {
performStateRecovery(null, fIgnoreTimeout);
performStateRecovery(fIgnoreRecoverAfterTime);
}
});
}
}
}
private void performStateRecovery(@Nullable TimeValue timeout) {
performStateRecovery(null, false);
}
private void performStateRecovery(boolean ignoreRecoverAfterTime) {
final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener(new CountDownLatch(1));
private void performStateRecovery(@Nullable TimeValue timeout, boolean ignoreTimeout) {
final CountDownLatch latch = new CountDownLatch(1);
final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener(latch);
if (!ignoreTimeout && recoverAfterTime != null) {
if (!ignoreRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime);
threadPool.schedule(recoverAfterTime, ThreadPool.Names.CACHED, new Runnable() {
@ -215,14 +205,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
gateway.performStateRecovery(recoveryListener);
}
}
if (timeout != null) {
try {
latch.await(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new ElasticSearchInterruptedException(e.getMessage(), e);
}
}
}
class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {