diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java index 56102704848..e793e407e1a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java @@ -47,6 +47,8 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; import java.util.Optional; import java.util.Set; import java.util.function.BooleanSupplier; @@ -130,7 +132,11 @@ public class DiscoveryUpgradeService { : lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster(); assert joiningRound == null : joiningRound; - joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes); + final Set knownMasterNodeIds = new HashSet<>(); + lastAcceptedClusterState.nodes().getMasterNodes().forEach(c -> knownMasterNodeIds.add(c.key)); + + joiningRound + = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes, knownMasterNodeIds); joiningRound.scheduleNextAttempt(); } @@ -168,10 +174,12 @@ public class DiscoveryUpgradeService { private class JoiningRound { private final boolean upgrading; private final int minimumMasterNodes; + private final Set knownMasterNodeIds; - JoiningRound(boolean upgrading, int minimumMasterNodes) { + JoiningRound(boolean upgrading, int minimumMasterNodes, Set knownMasterNodeIds) { this.upgrading = upgrading; this.minimumMasterNodes = minimumMasterNodes; + this.knownMasterNodeIds = knownMasterNodeIds; } private boolean isRunning() { @@ -210,8 +218,20 @@ public class DiscoveryUpgradeService { // no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade transportService.getThreadPool().generic().execute(() -> { try { - initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream() - .map(DiscoveryNode::getId).collect(Collectors.toSet()))); + Set nodeIds = new HashSet<>(); + discoveryNodes.forEach(n -> nodeIds.add(n.getId())); + + final Iterator knownNodeIdIterator = knownMasterNodeIds.iterator(); + while (nodeIds.size() < 2 * minimumMasterNodes - 1 && knownNodeIdIterator.hasNext()) { + nodeIds.add(knownNodeIdIterator.next()); + } + + final VotingConfiguration votingConfiguration = new VotingConfiguration(nodeIds); + assert votingConfiguration.hasQuorum( + discoveryNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())); + assert 2 * minimumMasterNodes - 2 <= nodeIds.size() : nodeIds + " too small for " + minimumMasterNodes; + + initialConfigurationConsumer.accept(votingConfiguration); } catch (Exception e) { logger.debug("exception during bootstrapping upgrade, retrying", e); } finally { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java index e8cd6911297..6ac753b5bc6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/Zen1IT.java @@ -22,9 +22,11 @@ import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclu import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -34,13 +36,19 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDeci import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaStateService; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster.RestartCallback; import org.elasticsearch.test.discovery.TestZenDiscovery; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -48,10 +56,14 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; +import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME; +import static org.elasticsearch.cluster.coordination.JoinHelper.START_JOIN_ACTION_NAME; +import static org.elasticsearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING; import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.test.InternalTestCluster.REMOVED_MINIMUM_MASTER_NODES; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -67,6 +79,10 @@ public class Zen1IT extends ESIntegTestCase { .put(TestZenDiscovery.USE_ZEN2.getKey(), true) .build(); + protected Collection> nodePlugins() { + return Collections.singletonList(MockTransportService.TestPlugin.class); + } + public void testZen2NodesJoiningZen1Cluster() { internalCluster().startNodes(randomIntBetween(1, 3), ZEN1_SETTINGS); internalCluster().startNodes(randomIntBetween(1, 3), ZEN2_SETTINGS); @@ -79,6 +95,56 @@ public class Zen1IT extends ESIntegTestCase { createIndex("test"); } + public void testMixedClusterDisruption() throws Exception { + final List nodes = internalCluster().startNodes(IntStream.range(0, 5) + .mapToObj(i -> i < 2 ? ZEN1_SETTINGS : ZEN2_SETTINGS).toArray(Settings[]::new)); + + final List transportServices = nodes.stream() + .map(n -> (MockTransportService) internalCluster().getInstance(TransportService.class, n)).collect(Collectors.toList()); + + logger.info("--> disrupting communications"); + + // The idea here is to make some of the Zen2 nodes believe the Zen1 nodes have gone away by introducing a network partition, so that + // they bootstrap themselves, but keep the Zen1 side of the cluster alive. + + // Set up a bridged network partition with the Zen1 nodes {0,1} on one side, Zen2 nodes {3,4} on the other, and node {2} in both + transportServices.get(0).addFailToSendNoConnectRule(transportServices.get(3)); + transportServices.get(0).addFailToSendNoConnectRule(transportServices.get(4)); + transportServices.get(1).addFailToSendNoConnectRule(transportServices.get(3)); + transportServices.get(1).addFailToSendNoConnectRule(transportServices.get(4)); + transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(0)); + transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(1)); + transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(0)); + transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(1)); + + // Nodes 3 and 4 will bootstrap, but we want to keep node 2 as part of the Zen1 cluster, so prevent any messages that might switch + // its allegiance + transportServices.get(3).addFailToSendNoConnectRule(transportServices.get(2), + PUBLISH_STATE_ACTION_NAME, FOLLOWER_CHECK_ACTION_NAME, START_JOIN_ACTION_NAME); + transportServices.get(4).addFailToSendNoConnectRule(transportServices.get(2), + PUBLISH_STATE_ACTION_NAME, FOLLOWER_CHECK_ACTION_NAME, START_JOIN_ACTION_NAME); + + logger.info("--> waiting for disconnected nodes to be removed"); + ensureStableCluster(3, nodes.get(0)); + + logger.info("--> creating index on Zen1 side"); + assertAcked(client(nodes.get(0)).admin().indices().create(new CreateIndexRequest("test")).get()); + assertFalse(client(nodes.get(0)).admin().cluster().health(new ClusterHealthRequest("test") + .waitForGreenStatus()).get().isTimedOut()); + + logger.info("--> waiting for disconnected nodes to bootstrap themselves"); + assertBusy(() -> assertTrue(IntStream.range(3, 5) + .mapToObj(n -> (Coordinator) internalCluster().getInstance(Discovery.class, nodes.get(n))) + .anyMatch(Coordinator::isInitialConfigurationSet))); + + logger.info("--> clearing disruption and waiting for cluster to reform"); + transportServices.forEach(MockTransportService::clearAllRules); + + ensureStableCluster(5, nodes.get(0)); + assertFalse(client(nodes.get(0)).admin().cluster().health(new ClusterHealthRequest("test") + .waitForGreenStatus()).get().isTimedOut()); + } + public void testMixedClusterFormation() throws Exception { final int zen1NodeCount = randomIntBetween(1, 3); final int zen2NodeCount = randomIntBetween(zen1NodeCount == 1 ? 2 : 1, 3);