[Zen2] Remove duplicate discovered peers (#35505)

Today the `PeerFinder` probes each address it obtains, identifies the node to
which it just connected, and then returns all such nodes. However, this can
lead to duplicates if a node manages to connect to another node via two
distinct addresses.  This causes bootstrapping to fail since
`BootstrapConfiguration#resolve` forbids duplicates.

This change alters the behaviour of the `PeerFinder` to remove duplicates in
this situation.
This commit is contained in:
David Turner 2018-11-13 22:30:36 +00:00 committed by GitHub
parent 8e40a2bbe2
commit 229637fd7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 36 deletions

View File

@ -230,7 +230,8 @@ public abstract class PeerFinder {
private List<DiscoveryNode> getFoundPeersUnderLock() {
assert holdsLock() : "PeerFinder mutex not held";
return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList());
return peersByAddress.values().stream()
.map(Peer::getDiscoveryNode).filter(Objects::nonNull).distinct().collect(Collectors.toList());
}
private Peer createConnectingPeer(TransportAddress transportAddress) {

View File

@ -47,8 +47,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
@ -92,11 +94,15 @@ public class PeerFinderTests extends ESTestCase {
private static long CONNECTION_TIMEOUT_MILLIS = 30000;
class MockTransportAddressConnector implements TransportAddressConnector {
final Set<DiscoveryNode> reachableNodes = new HashSet<>();
final Map<TransportAddress, DiscoveryNode> reachableNodes = new HashMap<>();
final Set<TransportAddress> unreachableAddresses = new HashSet<>();
final Set<TransportAddress> slowAddresses = new HashSet<>();
final Set<TransportAddress> inFlightConnectionAttempts = new HashSet<>();
void addReachableNode(DiscoveryNode node) {
reachableNodes.put(node.getAddress(), node);
}
@Override
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
assert localNode.getAddress().equals(transportAddress) == false : "should not probe local node";
@ -116,8 +122,9 @@ public class PeerFinderTests extends ESTestCase {
return;
}
for (final DiscoveryNode discoveryNode : reachableNodes) {
if (discoveryNode.getAddress().equals(transportAddress)) {
for (final Map.Entry<TransportAddress, DiscoveryNode> addressAndNode : reachableNodes.entrySet()) {
if (addressAndNode.getKey().equals(transportAddress)) {
final DiscoveryNode discoveryNode = addressAndNode.getValue();
if (discoveryNode.isMasterNode()) {
disconnectedNodes.remove(discoveryNode);
connectedNodes.add(discoveryNode);
@ -235,7 +242,22 @@ public class PeerFinderTests extends ESTestCase {
public void testAddsReachableNodesFromUnicastHostsList() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
}
public void testDoesNotReturnDuplicateNodesWithDistinctAddresses() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
final TransportAddress alternativeAddress = buildNewFakeTransportAddress();
providedAddresses.add(otherNode.getAddress());
providedAddresses.add(alternativeAddress);
transportAddressConnector.addReachableNode(otherNode);
transportAddressConnector.reachableNodes.put(alternativeAddress, otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -246,7 +268,7 @@ public class PeerFinderTests extends ESTestCase {
public void testAddsReachableNodesFromUnicastHostsListProvidedLater() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
addressResolveDelay = 10000;
peerFinder.activate(lastAcceptedNodes);
@ -265,7 +287,7 @@ public class PeerFinderTests extends ESTestCase {
public void testDoesNotRequireAddressResolutionToSucceed() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
addressResolveDelay = -1;
peerFinder.activate(lastAcceptedNodes);
@ -297,7 +319,7 @@ public class PeerFinderTests extends ESTestCase {
emptyMap(), emptySet(), Version.CURRENT);
providedAddresses.add(nonMasterNode.getAddress());
transportAddressConnector.reachableNodes.add(nonMasterNode);
transportAddressConnector.addReachableNode(nonMasterNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -314,7 +336,7 @@ public class PeerFinderTests extends ESTestCase {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
@ -325,7 +347,7 @@ public class PeerFinderTests extends ESTestCase {
public void testDeactivationClearsPastKnowledge() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -343,7 +365,7 @@ public class PeerFinderTests extends ESTestCase {
public void testAddsReachableNodesFromClusterState() {
final DiscoveryNode otherNode = newDiscoveryNode("node-in-cluster-state");
updateLastAcceptedNodes(b -> b.add(otherNode));
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -365,8 +387,8 @@ public class PeerFinderTests extends ESTestCase {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.reachableNodes.add(otherKnownNode);
transportAddressConnector.addReachableNode(sourceNode);
transportAddressConnector.addReachableNode(otherKnownNode);
peerFinder.activate(lastAcceptedNodes);
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode)));
@ -379,7 +401,7 @@ public class PeerFinderTests extends ESTestCase {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.addReachableNode(sourceNode);
transportAddressConnector.unreachableAddresses.add(otherKnownNode.getAddress());
peerFinder.activate(lastAcceptedNodes);
@ -394,7 +416,7 @@ public class PeerFinderTests extends ESTestCase {
final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node");
transportAddressConnector.unreachableAddresses.add(sourceNode.getAddress());
transportAddressConnector.reachableNodes.add(otherKnownNode);
transportAddressConnector.addReachableNode(otherKnownNode);
peerFinder.activate(lastAcceptedNodes);
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode)));
@ -406,7 +428,7 @@ public class PeerFinderTests extends ESTestCase {
public void testRespondsToRequestWhenActive() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.addReachableNode(sourceNode);
peerFinder.activate(lastAcceptedNodes);
final PeersResponse peersResponse1 = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList()));
@ -429,7 +451,7 @@ public class PeerFinderTests extends ESTestCase {
public void testDelegatesRequestHandlingWhenInactive() {
final DiscoveryNode masterNode = newDiscoveryNode("master-node");
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.addReachableNode(sourceNode);
peerFinder.activate(DiscoveryNodes.EMPTY_NODES);
@ -445,7 +467,7 @@ public class PeerFinderTests extends ESTestCase {
public void testReceivesRequestsFromTransportService() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.addReachableNode(sourceNode);
peerFinder.activate(lastAcceptedNodes);
@ -485,7 +507,7 @@ public class PeerFinderTests extends ESTestCase {
public void testRequestsPeersIncludingKnownPeersInRequest() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -501,7 +523,7 @@ public class PeerFinderTests extends ESTestCase {
public void testAddsReachablePeersFromResponse() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -509,7 +531,7 @@ public class PeerFinderTests extends ESTestCase {
assertFoundPeers(otherNode);
final DiscoveryNode discoveredNode = newDiscoveryNode("discovered-node");
transportAddressConnector.reachableNodes.add(discoveredNode);
transportAddressConnector.addReachableNode(discoveredNode);
respondToRequests(node -> {
assertThat(node, is(otherNode));
return new PeersResponse(Optional.empty(), singletonList(discoveredNode), randomNonNegativeLong());
@ -522,7 +544,7 @@ public class PeerFinderTests extends ESTestCase {
public void testAddsReachableMasterFromResponse() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -535,7 +557,7 @@ public class PeerFinderTests extends ESTestCase {
return new PeersResponse(Optional.of(discoveredMaster), emptyList(), randomNonNegativeLong());
});
transportAddressConnector.reachableNodes.add(discoveredMaster);
transportAddressConnector.addReachableNode(discoveredMaster);
runAllRunnableTasks();
assertFoundPeers(otherNode, discoveredMaster);
assertThat(peerFinder.discoveredMasterNode, nullValue());
@ -545,7 +567,7 @@ public class PeerFinderTests extends ESTestCase {
public void testHandlesDiscoveryOfMasterFromResponseFromMaster() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -566,7 +588,7 @@ public class PeerFinderTests extends ESTestCase {
public void testOnlyRequestsPeersOncePerRoundButDoesRetryNextRound() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.addReachableNode(sourceNode);
peerFinder.activate(lastAcceptedNodes);
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, emptyList()));
@ -585,7 +607,7 @@ public class PeerFinderTests extends ESTestCase {
});
final DiscoveryNode otherNode = newDiscoveryNode("otherNode");
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
@ -600,7 +622,7 @@ public class PeerFinderTests extends ESTestCase {
public void testDoesNotReconnectToNodesOnceConnected() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -617,7 +639,7 @@ public class PeerFinderTests extends ESTestCase {
public void testDiscardsDisconnectedNodes() {
final DiscoveryNode otherNode = newDiscoveryNode("original-node");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -650,7 +672,7 @@ public class PeerFinderTests extends ESTestCase {
transportAddressConnector.slowAddresses.clear();
transportAddressConnector.unreachableAddresses.clear();
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
while (deterministicTaskQueue.getCurrentTimeMillis() < CONNECTION_TIMEOUT_MILLIS) {
assertFoundPeers();
@ -674,8 +696,8 @@ public class PeerFinderTests extends ESTestCase {
final DiscoveryNode nodeToFind = newDiscoveryNode("node-to-find");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.reachableNodes.add(nodeToFind);
transportAddressConnector.addReachableNode(otherNode);
transportAddressConnector.addReachableNode(nodeToFind);
peerFinder.activate(lastAcceptedNodes);
@ -717,7 +739,7 @@ public class PeerFinderTests extends ESTestCase {
public void testReconnectsToDisconnectedNodes() {
final DiscoveryNode otherNode = newDiscoveryNode("original-node");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
transportAddressConnector.addReachableNode(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
@ -726,7 +748,7 @@ public class PeerFinderTests extends ESTestCase {
transportAddressConnector.reachableNodes.clear();
final DiscoveryNode rebootedOtherNode = new DiscoveryNode("rebooted-node", otherNode.getAddress(), Version.CURRENT);
transportAddressConnector.reachableNodes.add(rebootedOtherNode);
transportAddressConnector.addReachableNode(rebootedOtherNode);
connectedNodes.remove(otherNode);
disconnectedNodes.add(otherNode);
@ -749,9 +771,12 @@ public class PeerFinderTests extends ESTestCase {
}
private void assertFoundPeers(DiscoveryNode... expectedNodesArray) {
final Stream<DiscoveryNode> expectedNodes = Arrays.stream(expectedNodesArray);
final Stream<DiscoveryNode> actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false);
assertThat(actualNodes.collect(Collectors.toSet()), equalTo(expectedNodes.collect(Collectors.toSet())));
final Set<DiscoveryNode> expectedNodes = Arrays.stream(expectedNodesArray).collect(Collectors.toSet());
final List<DiscoveryNode> actualNodesList
= StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList());
final HashSet<DiscoveryNode> actualNodesSet = new HashSet<>(actualNodesList);
assertThat(actualNodesSet, equalTo(expectedNodes));
assertTrue("no duplicates in " + actualNodesList, actualNodesSet.size() == actualNodesList.size());
assertNotifiedOfAllUpdates();
}