Publish to masters first (#37673)

Prefer publishing to master-eligible nodes first, so that cluster state updates are committed more
quickly, and master-eligible nodes also turned more quickly into followers after a leader election.
This commit is contained in:
Yannick Welsch 2019-01-22 13:53:10 +01:00 committed by GitHub
parent 3fad1eeaed
commit 23ba900840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 5 deletions

View File

@ -135,7 +135,7 @@ public class FollowersChecker {
followerCheckers.keySet().removeIf(isUnknownNode);
faultyNodes.removeIf(isUnknownNode);
for (final DiscoveryNode discoveryNode : discoveryNodes) {
discoveryNodes.mastersFirstStream().forEach(discoveryNode -> {
if (discoveryNode.equals(discoveryNodes.getLocalNode()) == false
&& followerCheckers.containsKey(discoveryNode) == false
&& faultyNodes.contains(discoveryNode) == false) {
@ -144,7 +144,7 @@ public class FollowersChecker {
followerCheckers.put(discoveryNode, followerChecker);
followerChecker.start();
}
}
});
}
}

View File

@ -58,7 +58,7 @@ public abstract class Publication {
startTime = currentTimeSupplier.getAsLong();
applyCommitRequest = Optional.empty();
publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size());
publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n)));
publishRequest.getAcceptedState().getNodes().mastersFirstStream().forEach(n -> publicationTargets.add(new PublicationTarget(n)));
}
public void start(Set<DiscoveryNode> faultyNodes) {

View File

@ -40,6 +40,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
@ -161,6 +162,14 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
return nodes.build();
}
/**
* Returns a stream of all nodes, with master nodes at the front
*/
public Stream<DiscoveryNode> mastersFirstStream() {
return Stream.concat(StreamSupport.stream(masterNodes.spliterator(), false).map(cur -> cur.value),
StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false));
}
/**
* Get a node by its id
*

View File

@ -31,7 +31,9 @@ import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
@ -41,12 +43,21 @@ import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
@ -62,6 +73,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Mockito.mock;
public class FollowersCheckerTests extends ESTestCase {
@ -536,6 +548,46 @@ public class FollowersCheckerTests extends ESTestCase {
}
}
private void testPreferMasterNodes() {
List<DiscoveryNode> nodes = randomNodes(10);
DiscoveryNodes.Builder discoNodesBuilder = DiscoveryNodes.builder();
nodes.forEach(dn -> discoNodesBuilder.add(dn));
DiscoveryNodes discoveryNodes = discoNodesBuilder.localNodeId(nodes.get(0).getId()).build();
CapturingTransport capturingTransport = new CapturingTransport();
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, mock(ThreadPool.class),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> nodes.get(0), null, emptySet());
final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, transportService, fcr -> {
assert false : fcr;
}, (node, reason) -> {
assert false : node;
});
followersChecker.setCurrentNodes(discoveryNodes);
List<DiscoveryNode> followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear())
.map(cr -> cr.node).collect(Collectors.toList());
List<DiscoveryNode> sortedFollowerTargets = new ArrayList<>(followerTargets);
Collections.sort(sortedFollowerTargets, Comparator.comparing(n -> n.isMasterNode() == false));
assertEquals(sortedFollowerTargets, followerTargets);
}
private static List<DiscoveryNode> randomNodes(final int numNodes) {
List<DiscoveryNode> nodesList = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>();
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5));
}
final DiscoveryNode node = newNode(i, attributes,
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))));
nodesList.add(node);
}
return nodesList;
}
private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes, Set<DiscoveryNode.Role> roles) {
return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles,
Version.CURRENT);
}
private static class ExpectsSuccess implements TransportResponseHandler<Empty> {
private final AtomicBoolean responseReceived = new AtomicBoolean();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
@ -33,10 +34,13 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -97,8 +101,8 @@ public class PublicationTests extends ESTestCase {
boolean committed;
Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new HashMap<>();
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new HashMap<>();
Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new LinkedHashMap<>();
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new LinkedHashMap<>();
Map<DiscoveryNode, Join> joins = new HashMap<>();
Set<DiscoveryNode> missingJoins = new HashSet<>();
@ -372,6 +376,22 @@ public class PublicationTests extends ESTestCase {
tuple.v1().equals(n2) ? "dummy failure" : "non-failed nodes do not form a quorum")));
}
public void testPublishingToMastersFirst() {
VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId()));
initializeCluster(singleNodeConfig);
DiscoveryNodes.Builder discoNodesBuilder = DiscoveryNodes.builder();
randomNodes(10).forEach(dn -> discoNodesBuilder.add(dn));
DiscoveryNodes discoveryNodes = discoNodesBuilder.add(n1).localNodeId(n1.getId()).build();
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), null, Collections.emptySet());
List<DiscoveryNode> publicationTargets = new ArrayList<>(publication.pendingPublications.keySet());
List<DiscoveryNode> sortedPublicationTargets = new ArrayList<>(publicationTargets);
Collections.sort(sortedPublicationTargets, Comparator.comparing(n -> n.isMasterNode() == false));
assertEquals(sortedPublicationTargets, publicationTargets);
}
public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException {
VotingConfiguration config = new VotingConfiguration(randomBoolean() ?
Sets.newHashSet(n1.getId(), n2.getId()) : Sets.newHashSet(n1.getId(), n2.getId(), n3.getId()));
@ -428,6 +448,25 @@ public class PublicationTests extends ESTestCase {
assertEquals(discoNodes, ackListener.await(0L, TimeUnit.SECONDS));
}
private static List<DiscoveryNode> randomNodes(final int numNodes) {
List<DiscoveryNode> nodesList = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>();
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5));
}
final DiscoveryNode node = newNode(i, attributes,
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))));
nodesList.add(node);
}
return nodesList;
}
private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes, Set<DiscoveryNode.Role> roles) {
return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles,
Version.CURRENT);
}
public static <T> Collector<T, ?, Stream<T>> shuffle() {
return Collectors.collectingAndThen(Collectors.toList(),
ts -> {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -149,6 +150,18 @@ public class DiscoveryNodesTests extends ESTestCase {
assertThat(resolvedNodesIds, equalTo(expectedNodesIds));
}
public void testMastersFirst() {
final List<DiscoveryNode> inputNodes = randomNodes(10);
final DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
inputNodes.forEach(discoBuilder::add);
final List<DiscoveryNode> returnedNodes = discoBuilder.build().mastersFirstStream().collect(Collectors.toList());
assertEquals(returnedNodes.size(), inputNodes.size());
assertEquals(new HashSet<>(returnedNodes), new HashSet<>(inputNodes));
final List<DiscoveryNode> sortedNodes = new ArrayList<>(returnedNodes);
Collections.sort(sortedNodes, Comparator.comparing(n -> n.isMasterNode() == false));
assertEquals(sortedNodes, returnedNodes);
}
public void testDeltas() {
Set<DiscoveryNode> nodesA = new HashSet<>();
nodesA.addAll(randomNodes(1 + randomInt(10)));