diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index e8367af5c62..59e5091faef 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; @@ -55,7 +56,11 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -369,14 +374,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { // this one is not validated ahead of time and breaks allocation .put("index.analysis.filter.myCollator.type", "icu_collation") ).build(); - internalCluster().fullRestart(new RestartCallback(){ - @Override - public Settings onNodeStopped(String nodeName) throws Exception { - final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName); - metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta); - return super.onNodeStopped(nodeName); - } - }); + writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta)); // check that the cluster does not keep reallocating shards assertBusy(() -> { @@ -443,14 +441,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { final IndexMetaData metaData = state.getMetaData().index("test"); final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings() .filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build(); - internalCluster().fullRestart(new RestartCallback(){ - @Override - public Settings onNodeStopped(String nodeName) throws Exception { - final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName); - metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta); - return super.onNodeStopped(nodeName); - } - }); + writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta)); // check that the cluster does not keep reallocating shards assertBusy(() -> { @@ -495,14 +486,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder() .put(metaData.persistentSettings()).put("this.is.unknown", true) .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build(); - internalCluster().fullRestart(new RestartCallback(){ - @Override - public Settings onNodeStopped(String nodeName) throws Exception { - final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName); - metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta); - return super.onNodeStopped(nodeName); - } - }); + writeBrokenMeta(metaStateService -> metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta)); ensureYellow("test"); // wait for state recovery state = client().admin().cluster().prepareState().get().getState(); @@ -519,4 +503,17 @@ public class GatewayIndexStateIT extends ESIntegTestCase { + MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey())); assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L); } + + private void writeBrokenMeta(CheckedConsumer writer) throws Exception { + Map metaStateServices = Stream.of(internalCluster().getNodeNames()) + .collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(MetaStateService.class, nodeName))); + internalCluster().fullRestart(new RestartCallback(){ + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + final MetaStateService metaStateService = metaStateServices.get(nodeName); + writer.accept(metaStateService); + return super.onNodeStopped(nodeName); + } + }); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index fcc178cae57..de0103e2b57 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -144,7 +144,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import static java.util.Collections.emptyList; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; @@ -972,6 +971,7 @@ public final class InternalTestCluster extends TestCluster { Settings closeForRestart(RestartCallback callback, int minMasterNodes) throws Exception { assert callback != null; close(); + removeNode(this); Settings callbackSettings = callback.onNodeStopped(name); assert callbackSettings != null; Settings.Builder newSettings = Settings.builder(); @@ -1805,20 +1805,9 @@ public final class InternalTestCluster extends TestCluster { removeExclusions(excludedNodeIds); - boolean success = false; - try { - nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList())); - nodeAndClient.startNode(); - success = true; - } finally { - if (success == false) { - removeNode(nodeAndClient); - } - } - - if (activeDisruptionScheme != null) { - activeDisruptionScheme.applyToNode(nodeAndClient.name, this); - } + nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(Collections.singletonList(nodeAndClient))); + nodeAndClient.startNode(); + publishNode(nodeAndClient); if (callback.validateClusterForming() || excludedNodeIds.isEmpty() == false) { // we have to validate cluster size to ensure that the restarted node has rejoined the cluster if it was master-eligible; @@ -1894,6 +1883,7 @@ public final class InternalTestCluster extends TestCluster { Map, List> nodesByRoles = new HashMap<>(); Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()]; final int minMasterNodes = autoManageMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1; + final int nodeCount = nodes.size(); for (NodeAndClient nodeAndClient : nodes.values()) { callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); logger.info("Stopping and resetting node [{}] ", nodeAndClient.name); @@ -1907,7 +1897,7 @@ public final class InternalTestCluster extends TestCluster { nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient); } - assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size(); + assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodeCount; // randomize start up order, but making sure that: // 1) A data folder that was assigned to a data node will stay so diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java index 160103e8f24..4e708975ef9 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java @@ -18,8 +18,11 @@ */ package org.elasticsearch.test.test; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.InternalTestCluster; import java.io.IOException; @@ -61,4 +64,26 @@ public class InternalTestClusterIT extends ESIntegTestCase { ensureGreen(); } + + public void testOperationsDuringRestart() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNodes(2); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ensureGreen(); + internalCluster().validateClusterFormed(); + assertNotNull(internalCluster().getInstance(NodeClient.class)); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ensureGreen(); + internalCluster().validateClusterFormed(); + return super.onNodeStopped(nodeName); + } + }); + return super.onNodeStopped(nodeName); + } + }); + } }