Allow cluster access during node restart (#42946) (#43272)

This commit modifies InternalTestCluster to allow using client() and
other operations inside a RestartCallback (onStoppedNode typically).
Restarting nodes are now removed from the map and thus all
methods now return the state as if the restarting node does not exist.

This avoids various exceptions stemming from accessing the stopped
node(s).
This commit is contained in:
Henning Andersen 2019-06-17 15:04:17 +02:00 committed by GitHub
parent 4b58827beb
commit ba15d08e14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 40 deletions

View File

@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -55,7 +56,11 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -369,14 +374,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
// this one is not validated ahead of time and breaks allocation
.put("index.analysis.filter.myCollator.type", "icu_collation")
).build();
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
}
});
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));
// check that the cluster does not keep reallocating shards
assertBusy(() -> {
@ -443,14 +441,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
final IndexMetaData metaData = state.getMetaData().index("test");
final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings()
.filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build();
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
}
});
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));
// check that the cluster does not keep reallocating shards
assertBusy(() -> {
@ -495,14 +486,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder()
.put(metaData.persistentSettings()).put("this.is.unknown", true)
.put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build();
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta);
return super.onNodeStopped(nodeName);
}
});
writeBrokenMeta(metaStateService -> metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta));
ensureYellow("test"); // wait for state recovery
state = client().admin().cluster().prepareState().get().getState();
@ -519,4 +503,17 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
+ MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
}
private void writeBrokenMeta(CheckedConsumer<MetaStateService, IOException> writer) throws Exception {
Map<String, MetaStateService> metaStateServices = Stream.of(internalCluster().getNodeNames())
.collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(MetaStateService.class, nodeName)));
internalCluster().fullRestart(new RestartCallback(){
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
final MetaStateService metaStateService = metaStateServices.get(nodeName);
writer.accept(metaStateService);
return super.onNodeStopped(nodeName);
}
});
}
}

View File

@ -144,7 +144,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
@ -972,6 +971,7 @@ public final class InternalTestCluster extends TestCluster {
Settings closeForRestart(RestartCallback callback, int minMasterNodes) throws Exception {
assert callback != null;
close();
removeNode(this);
Settings callbackSettings = callback.onNodeStopped(name);
assert callbackSettings != null;
Settings.Builder newSettings = Settings.builder();
@ -1805,20 +1805,9 @@ public final class InternalTestCluster extends TestCluster {
removeExclusions(excludedNodeIds);
boolean success = false;
try {
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
nodeAndClient.startNode();
success = true;
} finally {
if (success == false) {
removeNode(nodeAndClient);
}
}
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(Collections.singletonList(nodeAndClient)));
nodeAndClient.startNode();
publishNode(nodeAndClient);
if (callback.validateClusterForming() || excludedNodeIds.isEmpty() == false) {
// we have to validate cluster size to ensure that the restarted node has rejoined the cluster if it was master-eligible;
@ -1894,6 +1883,7 @@ public final class InternalTestCluster extends TestCluster {
Map<Set<DiscoveryNodeRole>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
final int minMasterNodes = autoManageMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1;
final int nodeCount = nodes.size();
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Stopping and resetting node [{}] ", nodeAndClient.name);
@ -1907,7 +1897,7 @@ public final class InternalTestCluster extends TestCluster {
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
}
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size();
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodeCount;
// randomize start up order, but making sure that:
// 1) A data folder that was assigned to a data node will stay so

View File

@ -18,8 +18,11 @@
*/
package org.elasticsearch.test.test;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import java.io.IOException;
@ -61,4 +64,26 @@ public class InternalTestClusterIT extends ESIntegTestCase {
ensureGreen();
}
public void testOperationsDuringRestart() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(2);
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ensureGreen();
internalCluster().validateClusterFormed();
assertNotNull(internalCluster().getInstance(NodeClient.class));
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ensureGreen();
internalCluster().validateClusterFormed();
return super.onNodeStopped(nodeName);
}
});
return super.onNodeStopped(nodeName);
}
});
}
}