Gateway: Add gateway.expected_nodes for state recovery, closes #404.

This commit is contained in:
kimchy 2010-10-04 16:20:32 +02:00
parent a512c79f14
commit 1f49eb0b9d
2 changed files with 51 additions and 19 deletions

View File

@ -60,11 +60,15 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final TimeValue initialStateTimeout;
private final TimeValue recoverAfterTime;
private final int recoverAfterNodes;
private final int expectedNodes;
private final int recoverAfterDataNodes;
private final int expectedDataNodes;
private final int recoverAfterMasterNodes;
private final int expectedMasterNodes;
private final AtomicBoolean performedStateRecovery = new AtomicBoolean();
private final AtomicBoolean recovered = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService, ThreadPool threadPool) {
super(settings);
@ -76,8 +80,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// allow to control a delay of when indices will get created
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null);
this.recoverAfterNodes = componentSettings.getAsInt("recover_after_nodes", -1);
this.expectedNodes = componentSettings.getAsInt("expected_nodes", -1);
this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1);
this.expectedDataNodes = componentSettings.getAsInt("expected__data_nodes", -1);
this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", -1);
this.expectedMasterNodes = componentSettings.getAsInt("expected__master_nodes", -1);
}
@Override protected void doStart() throws ElasticSearchException {
@ -101,9 +108,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
updateClusterStateBlockedOnNotRecovered();
logger.debug("not recovering from gateway, recover_after_time [{}]", recoverAfterTime);
} else {
if (performedStateRecovery.compareAndSet(false, true)) {
performStateRecovery(initialStateTimeout);
}
performStateRecovery(initialStateTimeout);
}
}
} else {
@ -136,19 +141,39 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else {
if (performedStateRecovery.compareAndSet(false, true)) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
performStateRecovery(null);
}
});
boolean ignoreTimeout;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, don't ignore the timeout
ignoreTimeout = false;
} else {
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
ignoreTimeout = true;
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
ignoreTimeout = false;
}
if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
ignoreTimeout = false;
}
if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
ignoreTimeout = false;
}
}
final boolean fIgnoreTimeout = ignoreTimeout;
threadPool.cached().execute(new Runnable() {
@Override public void run() {
performStateRecovery(null, fIgnoreTimeout);
}
});
}
}
}
}
private void performStateRecovery(@Nullable TimeValue timeout) {
performStateRecovery(null, false);
}
private void performStateRecovery(@Nullable TimeValue timeout, boolean ignoreTimeout) {
final CountDownLatch latch = new CountDownLatch(1);
final Gateway.GatewayStateRecoveredListener recoveryListener = new Gateway.GatewayStateRecoveredListener() {
@Override public void onSuccess() {
@ -162,16 +187,22 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
}
};
if (recoverAfterTime != null) {
updateClusterStateBlockedOnNotRecovered();
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime);
threadPool.schedule(new Runnable() {
@Override public void run() {
gateway.performStateRecovery(recoveryListener);
}
}, recoverAfterTime);
if (!ignoreTimeout && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
updateClusterStateBlockedOnNotRecovered();
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime);
threadPool.schedule(new Runnable() {
@Override public void run() {
if (recovered.compareAndSet(false, true)) {
gateway.performStateRecovery(recoveryListener);
}
}
}, recoverAfterTime);
}
} else {
gateway.performStateRecovery(recoveryListener);
if (recovered.compareAndSet(false, true)) {
gateway.performStateRecovery(recoveryListener);
}
}
if (timeout != null) {

View File

@ -57,6 +57,7 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override public void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException {
logger.debug("performing state recovery");
listener.onSuccess();
}