diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 4a5f6d05544..92483167bb7 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -230,7 +230,8 @@ public abstract class PeerFinder { private List getFoundPeersUnderLock() { assert holdsLock() : "PeerFinder mutex not held"; - return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList()); + return peersByAddress.values().stream() + .map(Peer::getDiscoveryNode).filter(Objects::nonNull).distinct().collect(Collectors.toList()); } private Peer createConnectingPeer(TransportAddress transportAddress) { diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index d90b23dd4ab..a7b8490cd9e 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -47,8 +47,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; @@ -92,11 +94,15 @@ public class PeerFinderTests extends ESTestCase { private static long CONNECTION_TIMEOUT_MILLIS = 30000; class MockTransportAddressConnector implements TransportAddressConnector { - final Set reachableNodes = new HashSet<>(); + final Map reachableNodes = new HashMap<>(); final Set unreachableAddresses = new HashSet<>(); final Set slowAddresses = new HashSet<>(); final Set inFlightConnectionAttempts = new HashSet<>(); + void addReachableNode(DiscoveryNode node) { + reachableNodes.put(node.getAddress(), node); + } + @Override public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { assert localNode.getAddress().equals(transportAddress) == false : "should not probe local node"; @@ -116,8 +122,9 @@ public class PeerFinderTests extends ESTestCase { return; } - for (final DiscoveryNode discoveryNode : reachableNodes) { - if (discoveryNode.getAddress().equals(transportAddress)) { + for (final Map.Entry addressAndNode : reachableNodes.entrySet()) { + if (addressAndNode.getKey().equals(transportAddress)) { + final DiscoveryNode discoveryNode = addressAndNode.getValue(); if (discoveryNode.isMasterNode()) { disconnectedNodes.remove(discoveryNode); connectedNodes.add(discoveryNode); @@ -235,7 +242,22 @@ public class PeerFinderTests extends ESTestCase { public void testAddsReachableNodesFromUnicastHostsList() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + } + + public void testDoesNotReturnDuplicateNodesWithDistinctAddresses() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + final TransportAddress alternativeAddress = buildNewFakeTransportAddress(); + + providedAddresses.add(otherNode.getAddress()); + providedAddresses.add(alternativeAddress); + transportAddressConnector.addReachableNode(otherNode); + transportAddressConnector.reachableNodes.put(alternativeAddress, otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -246,7 +268,7 @@ public class PeerFinderTests extends ESTestCase { public void testAddsReachableNodesFromUnicastHostsListProvidedLater() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); addressResolveDelay = 10000; peerFinder.activate(lastAcceptedNodes); @@ -265,7 +287,7 @@ public class PeerFinderTests extends ESTestCase { public void testDoesNotRequireAddressResolutionToSucceed() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); addressResolveDelay = -1; peerFinder.activate(lastAcceptedNodes); @@ -297,7 +319,7 @@ public class PeerFinderTests extends ESTestCase { emptyMap(), emptySet(), Version.CURRENT); providedAddresses.add(nonMasterNode.getAddress()); - transportAddressConnector.reachableNodes.add(nonMasterNode); + transportAddressConnector.addReachableNode(nonMasterNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -314,7 +336,7 @@ public class PeerFinderTests extends ESTestCase { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); deterministicTaskQueue.advanceTime(); runAllRunnableTasks(); @@ -325,7 +347,7 @@ public class PeerFinderTests extends ESTestCase { public void testDeactivationClearsPastKnowledge() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -343,7 +365,7 @@ public class PeerFinderTests extends ESTestCase { public void testAddsReachableNodesFromClusterState() { final DiscoveryNode otherNode = newDiscoveryNode("node-in-cluster-state"); updateLastAcceptedNodes(b -> b.add(otherNode)); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -365,8 +387,8 @@ public class PeerFinderTests extends ESTestCase { final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node"); - transportAddressConnector.reachableNodes.add(sourceNode); - transportAddressConnector.reachableNodes.add(otherKnownNode); + transportAddressConnector.addReachableNode(sourceNode); + transportAddressConnector.addReachableNode(otherKnownNode); peerFinder.activate(lastAcceptedNodes); peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode))); @@ -379,7 +401,7 @@ public class PeerFinderTests extends ESTestCase { final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node"); - transportAddressConnector.reachableNodes.add(sourceNode); + transportAddressConnector.addReachableNode(sourceNode); transportAddressConnector.unreachableAddresses.add(otherKnownNode.getAddress()); peerFinder.activate(lastAcceptedNodes); @@ -394,7 +416,7 @@ public class PeerFinderTests extends ESTestCase { final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node"); transportAddressConnector.unreachableAddresses.add(sourceNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherKnownNode); + transportAddressConnector.addReachableNode(otherKnownNode); peerFinder.activate(lastAcceptedNodes); peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode))); @@ -406,7 +428,7 @@ public class PeerFinderTests extends ESTestCase { public void testRespondsToRequestWhenActive() { final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); - transportAddressConnector.reachableNodes.add(sourceNode); + transportAddressConnector.addReachableNode(sourceNode); peerFinder.activate(lastAcceptedNodes); final PeersResponse peersResponse1 = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList())); @@ -429,7 +451,7 @@ public class PeerFinderTests extends ESTestCase { public void testDelegatesRequestHandlingWhenInactive() { final DiscoveryNode masterNode = newDiscoveryNode("master-node"); final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); - transportAddressConnector.reachableNodes.add(sourceNode); + transportAddressConnector.addReachableNode(sourceNode); peerFinder.activate(DiscoveryNodes.EMPTY_NODES); @@ -445,7 +467,7 @@ public class PeerFinderTests extends ESTestCase { public void testReceivesRequestsFromTransportService() { final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); - transportAddressConnector.reachableNodes.add(sourceNode); + transportAddressConnector.addReachableNode(sourceNode); peerFinder.activate(lastAcceptedNodes); @@ -485,7 +507,7 @@ public class PeerFinderTests extends ESTestCase { public void testRequestsPeersIncludingKnownPeersInRequest() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -501,7 +523,7 @@ public class PeerFinderTests extends ESTestCase { public void testAddsReachablePeersFromResponse() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -509,7 +531,7 @@ public class PeerFinderTests extends ESTestCase { assertFoundPeers(otherNode); final DiscoveryNode discoveredNode = newDiscoveryNode("discovered-node"); - transportAddressConnector.reachableNodes.add(discoveredNode); + transportAddressConnector.addReachableNode(discoveredNode); respondToRequests(node -> { assertThat(node, is(otherNode)); return new PeersResponse(Optional.empty(), singletonList(discoveredNode), randomNonNegativeLong()); @@ -522,7 +544,7 @@ public class PeerFinderTests extends ESTestCase { public void testAddsReachableMasterFromResponse() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -535,7 +557,7 @@ public class PeerFinderTests extends ESTestCase { return new PeersResponse(Optional.of(discoveredMaster), emptyList(), randomNonNegativeLong()); }); - transportAddressConnector.reachableNodes.add(discoveredMaster); + transportAddressConnector.addReachableNode(discoveredMaster); runAllRunnableTasks(); assertFoundPeers(otherNode, discoveredMaster); assertThat(peerFinder.discoveredMasterNode, nullValue()); @@ -545,7 +567,7 @@ public class PeerFinderTests extends ESTestCase { public void testHandlesDiscoveryOfMasterFromResponseFromMaster() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -566,7 +588,7 @@ public class PeerFinderTests extends ESTestCase { public void testOnlyRequestsPeersOncePerRoundButDoesRetryNextRound() { final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); - transportAddressConnector.reachableNodes.add(sourceNode); + transportAddressConnector.addReachableNode(sourceNode); peerFinder.activate(lastAcceptedNodes); peerFinder.handlePeersRequest(new PeersRequest(sourceNode, emptyList())); @@ -585,7 +607,7 @@ public class PeerFinderTests extends ESTestCase { }); final DiscoveryNode otherNode = newDiscoveryNode("otherNode"); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); deterministicTaskQueue.advanceTime(); runAllRunnableTasks(); @@ -600,7 +622,7 @@ public class PeerFinderTests extends ESTestCase { public void testDoesNotReconnectToNodesOnceConnected() { final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -617,7 +639,7 @@ public class PeerFinderTests extends ESTestCase { public void testDiscardsDisconnectedNodes() { final DiscoveryNode otherNode = newDiscoveryNode("original-node"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -650,7 +672,7 @@ public class PeerFinderTests extends ESTestCase { transportAddressConnector.slowAddresses.clear(); transportAddressConnector.unreachableAddresses.clear(); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); while (deterministicTaskQueue.getCurrentTimeMillis() < CONNECTION_TIMEOUT_MILLIS) { assertFoundPeers(); @@ -674,8 +696,8 @@ public class PeerFinderTests extends ESTestCase { final DiscoveryNode nodeToFind = newDiscoveryNode("node-to-find"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); - transportAddressConnector.reachableNodes.add(nodeToFind); + transportAddressConnector.addReachableNode(otherNode); + transportAddressConnector.addReachableNode(nodeToFind); peerFinder.activate(lastAcceptedNodes); @@ -717,7 +739,7 @@ public class PeerFinderTests extends ESTestCase { public void testReconnectsToDisconnectedNodes() { final DiscoveryNode otherNode = newDiscoveryNode("original-node"); providedAddresses.add(otherNode.getAddress()); - transportAddressConnector.reachableNodes.add(otherNode); + transportAddressConnector.addReachableNode(otherNode); peerFinder.activate(lastAcceptedNodes); runAllRunnableTasks(); @@ -726,7 +748,7 @@ public class PeerFinderTests extends ESTestCase { transportAddressConnector.reachableNodes.clear(); final DiscoveryNode rebootedOtherNode = new DiscoveryNode("rebooted-node", otherNode.getAddress(), Version.CURRENT); - transportAddressConnector.reachableNodes.add(rebootedOtherNode); + transportAddressConnector.addReachableNode(rebootedOtherNode); connectedNodes.remove(otherNode); disconnectedNodes.add(otherNode); @@ -749,9 +771,12 @@ public class PeerFinderTests extends ESTestCase { } private void assertFoundPeers(DiscoveryNode... expectedNodesArray) { - final Stream expectedNodes = Arrays.stream(expectedNodesArray); - final Stream actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false); - assertThat(actualNodes.collect(Collectors.toSet()), equalTo(expectedNodes.collect(Collectors.toSet()))); + final Set expectedNodes = Arrays.stream(expectedNodesArray).collect(Collectors.toSet()); + final List actualNodesList + = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()); + final HashSet actualNodesSet = new HashSet<>(actualNodesList); + assertThat(actualNodesSet, equalTo(expectedNodes)); + assertTrue("no duplicates in " + actualNodesList, actualNodesSet.size() == actualNodesList.size()); assertNotifiedOfAllUpdates(); }