[Gateway] set a default of 5m to `recover_after_time` when any to the `expected*Nodes` is set

The `recovery_after_time` tells the gateway to wait before starting recovery from disk. The goal here is to allow for more nodes to join the cluster and thus not start potentially unneeded replications. The `expectedNodes` setting (and friends) tells the gateway when it can start recovering even if the `recover_after_time` has not yet elapsed. However, `expectedNodes` is useless if one doesn't set `recovery_after_time`. This commit changes that by setting a sensible default of 5m for `recover_after_time` *if* a `expectedNodes` setting is present.

Closes #6742
This commit is contained in:
Boaz Leskes 2014-07-04 20:12:05 +02:00
parent af4eee594c
commit f480969503
5 changed files with 285 additions and 81 deletions

View File

@ -42,14 +42,14 @@ once all `gateway.recover_after...nodes` conditions are met.
The `gateway.expected_nodes` allows to set how many data and master The `gateway.expected_nodes` allows to set how many data and master
eligible nodes are expected to be in the cluster, and once met, the eligible nodes are expected to be in the cluster, and once met, the
`recover_after_time` is ignored and recovery starts. The `gateway.recover_after_time` is ignored and recovery starts.
`gateway.expected_data_nodes` and `gateway.expected_master_nodes` Setting `gateway.expected_nodes` also defaults `gateway.recovery_after_time` to `5m` coming[1.3.0, before `expected_nodes`
required `recovery_after_time` to be set]. The `gateway.expected_data_nodes` and `gateway.expected_master_nodes`
settings are also supported. For example setting: settings are also supported. For example setting:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
gateway: gateway:
recover_after_nodes: 1
recover_after_time: 5m recover_after_time: 5m
expected_nodes: 2 expected_nodes: 2
-------------------------------------------------- --------------------------------------------------

View File

@ -18,9 +18,8 @@ For example:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
gateway: gateway:
recover_after_nodes: 1 recover_after_nodes: 3
recover_after_time: 5m expected_nodes: 5
expected_nodes: 2
-------------------------------------------------- --------------------------------------------------
[float] [float]

View File

@ -50,6 +50,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
public static final TimeValue DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET = TimeValue.timeValueMinutes(5);
private final Gateway gateway; private final Gateway gateway;
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -81,14 +83,20 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
this.discoveryService = discoveryService; this.discoveryService = discoveryService;
this.threadPool = threadPool; this.threadPool = threadPool;
// allow to control a delay of when indices will get created // allow to control a delay of when indices will get created
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null);
this.recoverAfterNodes = componentSettings.getAsInt("recover_after_nodes", -1);
this.expectedNodes = componentSettings.getAsInt("expected_nodes", -1); this.expectedNodes = componentSettings.getAsInt("expected_nodes", -1);
this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1);
this.expectedDataNodes = componentSettings.getAsInt("expected_data_nodes", -1); this.expectedDataNodes = componentSettings.getAsInt("expected_data_nodes", -1);
this.expectedMasterNodes = componentSettings.getAsInt("expected_master_nodes", -1);
TimeValue defaultRecoverAfterTime = null;
if (expectedNodes >= 0 || expectedDataNodes >= 0 || expectedMasterNodes >= 0) {
defaultRecoverAfterTime = DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET;
}
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", defaultRecoverAfterTime);
this.recoverAfterNodes = componentSettings.getAsInt("recover_after_nodes", -1);
this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1);
// default the recover after master nodes to the minimum master nodes in the discovery // default the recover after master nodes to the minimum master nodes in the discovery
this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", settings.getAsInt("discovery.zen.minimum_master_nodes", -1)); this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", settings.getAsInt("discovery.zen.minimum_master_nodes", -1));
this.expectedMasterNodes = componentSettings.getAsInt("expected_master_nodes", -1);
// Add the not recovered as initial state block, we don't allow anything until // Add the not recovered as initial state block, we don't allow anything until
this.clusterService.addInitialStateBlock(STATE_NOT_RECOVERED_BLOCK); this.clusterService.addInitialStateBlock(STATE_NOT_RECOVERED_BLOCK);
@ -101,36 +109,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// node from starting until we recovered properly // node from starting until we recovered properly
if (discoveryService.initialStateReceived()) { if (discoveryService.initialStateReceived()) {
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
DiscoveryNodes nodes = clusterState.nodes();
if (clusterState.nodes().localNodeMaster() && clusterState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { if (clusterState.nodes().localNodeMaster() && clusterState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
if (clusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) { checkStateMeetsSettingsAndMaybeRecover(clusterState, false);
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
logger.debug("not recovering from gateway, nodes_size (data) [" + nodes.dataNodes().size() + "] < recover_after_data_nodes [" + recoverAfterDataNodes + "]");
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else {
boolean ignoreRecoverAfterTime;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, don't ignore the timeout
ignoreRecoverAfterTime = false;
} else {
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
ignoreRecoverAfterTime = true;
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
}
performStateRecovery(ignoreRecoverAfterTime);
}
} }
} else { } else {
logger.debug("can't wait on start for (possibly) reading state from gateway, will do it asynchronously"); logger.debug("can't wait on start for (possibly) reading state from gateway, will do it asynchronously");
@ -161,9 +141,13 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
scheduledRecovery.set(false); scheduledRecovery.set(false);
} }
if (event.localNodeMaster() && event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { if (event.localNodeMaster() && event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
ClusterState clusterState = event.state(); checkStateMeetsSettingsAndMaybeRecover(event.state(), true);
DiscoveryNodes nodes = clusterState.nodes(); }
if (event.state().blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) { }
protected void checkStateMeetsSettingsAndMaybeRecover(ClusterState state, boolean asyncRecovery) {
DiscoveryNodes nodes = state.nodes();
if (state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
logger.debug("not recovering from gateway, no master elected yet"); logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) { } else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]"); logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
@ -172,45 +156,42 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) { } else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]"); logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else { } else {
boolean ignoreRecoverAfterTime; boolean enforceRecoverAfterTime;
String reason;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) { if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, don't ignore the timeout // no expected is set, honor the setting if they are there
ignoreRecoverAfterTime = false; enforceRecoverAfterTime = true;
reason = "recovery_after_time was set to [" + recoverAfterTime + "]";
} else { } else {
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case // one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
ignoreRecoverAfterTime = true; enforceRecoverAfterTime = false;
reason = "";
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected... if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false; enforceRecoverAfterTime = true;
} reason = "expecting [" + expectedNodes + "] nodes, but only have [" + nodes.masterAndDataNodes().size() + "]";
if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected... } else if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false; enforceRecoverAfterTime = true;
} reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.dataNodes().size() + "]";
if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected... } else if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false; enforceRecoverAfterTime = true;
reason = "expecting [" + expectedMasterNodes + "] master nodes, but only have [" + nodes.masterNodes().size() + "]";
} }
} }
final boolean fIgnoreRecoverAfterTime = ignoreRecoverAfterTime; performStateRecovery(asyncRecovery, enforceRecoverAfterTime, reason);
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
performStateRecovery(fIgnoreRecoverAfterTime);
}
});
}
} }
} }
private void performStateRecovery(boolean ignoreRecoverAfterTime) { private void performStateRecovery(boolean asyncRecovery, boolean enforceRecoverAfterTime, String reason) {
final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener(new CountDownLatch(1)); final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener(new CountDownLatch(1));
if (!ignoreRecoverAfterTime && recoverAfterTime != null) { if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) { if (scheduledRecovery.compareAndSet(false, true)) {
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime); logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, new Runnable() { threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, new Runnable() {
@Override @Override
public void run() { public void run() {
if (recovered.compareAndSet(false, true)) { if (recovered.compareAndSet(false, true)) {
logger.trace("performing state recovery..."); logger.info("recovery_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
gateway.performStateRecovery(recoveryListener); gateway.performStateRecovery(recoveryListener);
} }
} }
@ -218,11 +199,20 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
} }
} else { } else {
if (recovered.compareAndSet(false, true)) { if (recovered.compareAndSet(false, true)) {
if (asyncRecovery) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
gateway.performStateRecovery(recoveryListener);
}
});
} else {
logger.trace("performing state recovery..."); logger.trace("performing state recovery...");
gateway.performStateRecovery(recoveryListener); gateway.performStateRecovery(recoveryListener);
} }
} }
} }
}
class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener { class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {
@ -300,4 +290,10 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
logger.info("metadata state not restored, reason: {}", message); logger.info("metadata state not restored, reason: {}", message);
} }
} }
// used for testing
public TimeValue recoverAfterTime() {
return recoverAfterTime;
}
} }

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cluster.NoopClusterService;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
public class GatewayServiceTests extends ElasticsearchTestCase {
private GatewayService createService(ImmutableSettings.Builder settings) {
return new GatewayService(ImmutableSettings.builder()
.put("http.enabled", "false")
.put("discovery.type", "local")
.put(settings.build()).build(), null, null, new NoopClusterService(), null, null);
}
@Test
public void testDefaultRecoverAfterTime() throws IOException {
// check that the default is not set
GatewayService service = createService(ImmutableSettings.builder());
assertNull(service.recoverAfterTime());
// ensure default is set when setting expected_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_nodes", 1));
assertThat(service.recoverAfterTime(), Matchers.equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET));
// ensure default is set when setting expected_data_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_data_nodes", 1));
assertThat(service.recoverAfterTime(), Matchers.equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET));
// ensure default is set when setting expected_master_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_master_nodes", 1));
assertThat(service.recoverAfterTime(), Matchers.equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET));
// ensure settings override default
TimeValue timeValue = TimeValue.timeValueHours(3);
// ensure default is set when setting expected_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_nodes", 1).put("gateway.recover_after_time", timeValue.toString()));
assertThat(service.recoverAfterTime().millis(), Matchers.equalTo(timeValue.millis()));
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.cluster;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.unit.TimeValue;
import java.util.List;
public class NoopClusterService implements ClusterService {
@Override
public DiscoveryNode localNode() {
return null;
}
@Override
public ClusterState state() {
return null;
}
@Override
public void addInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException {
}
@Override
public void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException {
}
@Override
public OperationRouting operationRouting() {
return null;
}
@Override
public void addFirst(ClusterStateListener listener) {
}
@Override
public void addLast(ClusterStateListener listener) {
}
@Override
public void add(ClusterStateListener listener) {
}
@Override
public void remove(ClusterStateListener listener) {
}
@Override
public void add(LocalNodeMasterListener listener) {
}
@Override
public void remove(LocalNodeMasterListener listener) {
}
@Override
public void add(TimeValue timeout, TimeoutClusterStateListener listener) {
}
@Override
public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
}
@Override
public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
}
@Override
public List<PendingClusterTask> pendingTasks() {
return null;
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
}
@Override
public ClusterService start() throws ElasticsearchException {
return null;
}
@Override
public ClusterService stop() throws ElasticsearchException {
return null;
}
@Override
public void close() throws ElasticsearchException {
}
}