diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index b7f99df31c0..641f3941cb3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -135,7 +135,7 @@ public class FollowersChecker { followerCheckers.keySet().removeIf(isUnknownNode); faultyNodes.removeIf(isUnknownNode); - for (final DiscoveryNode discoveryNode : discoveryNodes) { + discoveryNodes.mastersFirstStream().forEach(discoveryNode -> { if (discoveryNode.equals(discoveryNodes.getLocalNode()) == false && followerCheckers.containsKey(discoveryNode) == false && faultyNodes.contains(discoveryNode) == false) { @@ -144,7 +144,7 @@ public class FollowersChecker { followerCheckers.put(discoveryNode, followerChecker); followerChecker.start(); } - } + }); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 6838c2f996f..4aea820d6d9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -58,7 +58,7 @@ public abstract class Publication { startTime = currentTimeSupplier.getAsLong(); applyCommitRequest = Optional.empty(); publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size()); - publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n))); + publishRequest.getAcceptedState().getNodes().mastersFirstStream().forEach(n -> publicationTargets.add(new PublicationTarget(n))); } public void start(Set faultyNodes) { diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index 2e0e6318691..02f4d5d93bf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -40,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Stream; import java.util.stream.StreamSupport; /** @@ -161,6 +162,14 @@ public class DiscoveryNodes extends AbstractDiffable implements return nodes.build(); } + /** + * Returns a stream of all nodes, with master nodes at the front + */ + public Stream mastersFirstStream() { + return Stream.concat(StreamSupport.stream(masterNodes.spliterator(), false).map(cur -> cur.value), + StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false)); + } + /** * Get a node by its id * diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index 6d985d46ef6..4f1016847c8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -31,7 +31,9 @@ import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; +import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportException; @@ -41,12 +43,21 @@ import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME; @@ -62,6 +73,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.mockito.Mockito.mock; public class FollowersCheckerTests extends ESTestCase { @@ -536,6 +548,46 @@ public class FollowersCheckerTests extends ESTestCase { } } + private void testPreferMasterNodes() { + List nodes = randomNodes(10); + DiscoveryNodes.Builder discoNodesBuilder = DiscoveryNodes.builder(); + nodes.forEach(dn -> discoNodesBuilder.add(dn)); + DiscoveryNodes discoveryNodes = discoNodesBuilder.localNodeId(nodes.get(0).getId()).build(); + CapturingTransport capturingTransport = new CapturingTransport(); + TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, mock(ThreadPool.class), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> nodes.get(0), null, emptySet()); + final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, transportService, fcr -> { + assert false : fcr; + }, (node, reason) -> { + assert false : node; + }); + followersChecker.setCurrentNodes(discoveryNodes); + List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) + .map(cr -> cr.node).collect(Collectors.toList()); + List sortedFollowerTargets = new ArrayList<>(followerTargets); + Collections.sort(sortedFollowerTargets, Comparator.comparing(n -> n.isMasterNode() == false)); + assertEquals(sortedFollowerTargets, followerTargets); + } + + private static List randomNodes(final int numNodes) { + List nodesList = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + Map attributes = new HashMap<>(); + if (frequently()) { + attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5)); + } + final DiscoveryNode node = newNode(i, attributes, + new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values())))); + nodesList.add(node); + } + return nodesList; + } + + private static DiscoveryNode newNode(int nodeId, Map attributes, Set roles) { + return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles, + Version.CURRENT); + } + private static class ExpectsSuccess implements TransportResponseHandler { private final AtomicBoolean responseReceived = new AtomicBoolean(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 6f506bbfc91..658250bc7a4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; @@ -33,10 +34,13 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -97,8 +101,8 @@ public class PublicationTests extends ESTestCase { boolean committed; - Map> pendingPublications = new HashMap<>(); - Map> pendingCommits = new HashMap<>(); + Map> pendingPublications = new LinkedHashMap<>(); + Map> pendingCommits = new LinkedHashMap<>(); Map joins = new HashMap<>(); Set missingJoins = new HashSet<>(); @@ -372,6 +376,22 @@ public class PublicationTests extends ESTestCase { tuple.v1().equals(n2) ? "dummy failure" : "non-failed nodes do not form a quorum"))); } + public void testPublishingToMastersFirst() { + VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); + initializeCluster(singleNodeConfig); + + DiscoveryNodes.Builder discoNodesBuilder = DiscoveryNodes.builder(); + randomNodes(10).forEach(dn -> discoNodesBuilder.add(dn)); + DiscoveryNodes discoveryNodes = discoNodesBuilder.add(n1).localNodeId(n1.getId()).build(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), null, Collections.emptySet()); + + List publicationTargets = new ArrayList<>(publication.pendingPublications.keySet()); + List sortedPublicationTargets = new ArrayList<>(publicationTargets); + Collections.sort(sortedPublicationTargets, Comparator.comparing(n -> n.isMasterNode() == false)); + assertEquals(sortedPublicationTargets, publicationTargets); + } + public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { VotingConfiguration config = new VotingConfiguration(randomBoolean() ? Sets.newHashSet(n1.getId(), n2.getId()) : Sets.newHashSet(n1.getId(), n2.getId(), n3.getId())); @@ -428,6 +448,25 @@ public class PublicationTests extends ESTestCase { assertEquals(discoNodes, ackListener.await(0L, TimeUnit.SECONDS)); } + private static List randomNodes(final int numNodes) { + List nodesList = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + Map attributes = new HashMap<>(); + if (frequently()) { + attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5)); + } + final DiscoveryNode node = newNode(i, attributes, + new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values())))); + nodesList.add(node); + } + return nodesList; + } + + private static DiscoveryNode newNode(int nodeId, Map attributes, Set roles) { + return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles, + Version.CURRENT); + } + public static Collector> shuffle() { return Collectors.collectingAndThen(Collectors.toList(), ts -> { diff --git a/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodesTests.java index cf0b1eebe98..5c92029520e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodesTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -149,6 +150,18 @@ public class DiscoveryNodesTests extends ESTestCase { assertThat(resolvedNodesIds, equalTo(expectedNodesIds)); } + public void testMastersFirst() { + final List inputNodes = randomNodes(10); + final DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + inputNodes.forEach(discoBuilder::add); + final List returnedNodes = discoBuilder.build().mastersFirstStream().collect(Collectors.toList()); + assertEquals(returnedNodes.size(), inputNodes.size()); + assertEquals(new HashSet<>(returnedNodes), new HashSet<>(inputNodes)); + final List sortedNodes = new ArrayList<>(returnedNodes); + Collections.sort(sortedNodes, Comparator.comparing(n -> n.isMasterNode() == false)); + assertEquals(sortedNodes, returnedNodes); + } + public void testDeltas() { Set nodesA = new HashSet<>(); nodesA.addAll(randomNodes(1 + randomInt(10)));