diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java new file mode 100644 index 00000000000..6e2e36697c6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +public class PeersResponse extends TransportResponse { + private final Optional masterNode; + private final List knownPeers; + private final long term; + + public PeersResponse(Optional masterNode, List knownPeers, long term) { + assert masterNode.isPresent() == false || knownPeers.isEmpty(); + this.masterNode = masterNode; + this.knownPeers = knownPeers; + this.term = term; + } + + public PeersResponse(StreamInput in) throws IOException { + masterNode = Optional.ofNullable(in.readOptionalWriteable(DiscoveryNode::new)); + knownPeers = in.readList(DiscoveryNode::new); + term = in.readLong(); + assert masterNode.isPresent() == false || knownPeers.isEmpty(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalWriteable(masterNode.orElse(null)); + out.writeList(knownPeers); + out.writeLong(term); + } + + /** + * @return the node that is currently leading, according to the responding node. + */ + public Optional getMasterNode() { + return masterNode; + } + + /** + * @return the collection of known peers of the responding node, or an empty collection if the responding node believes there + * is currently a leader. + */ + public List getKnownPeers() { + return knownPeers; + } + + /** + * @return the current term of the responding node. If the responding node is the leader then this is the term in which it is + * currently leading. + */ + public long getTerm() { + return term; + } + + @Override + public String toString() { + return "PeersResponse{" + + "masterNode=" + masterNode + + ", knownPeers=" + knownPeers + + ", term=" + term + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PeersResponse that = (PeersResponse) o; + return term == that.term && + Objects.equals(masterNode, that.masterNode) && + Objects.equals(knownPeers, that.knownPeers); + } + + @Override + public int hashCode() { + return Objects.hash(masterNode, knownPeers, term); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 03a0f3b42a1..5e3945ba58e 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -54,6 +54,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.PeerFinder; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.FaultDetection; import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider; @@ -423,6 +424,7 @@ public final class ClusterSettings extends AbstractScopedSettings { Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, - EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING + EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING ))); } diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java new file mode 100644 index 00000000000..8c230dd93a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -0,0 +1,387 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.coordination.FutureExecutor; +import org.elasticsearch.cluster.coordination.PeersResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; + +public abstract class PeerFinder extends AbstractComponent { + + public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers"; + + // the time between attempts to find all peers + public static final Setting DISCOVERY_FIND_PEERS_INTERVAL_SETTING = + Setting.timeSetting("discovery.find_peers_interval", + TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + private final TimeValue findPeersDelay; + + private final Object mutex = new Object(); + private final TransportService transportService; + private final FutureExecutor futureExecutor; + private final TransportAddressConnector transportAddressConnector; + private final ConfiguredHostsResolver configuredHostsResolver; + + private volatile long currentTerm; + private boolean active; + private DiscoveryNodes lastAcceptedNodes; + private final Map peersByAddress = newConcurrentMap(); + private Optional leader = Optional.empty(); + + PeerFinder(Settings settings, TransportService transportService, FutureExecutor futureExecutor, + TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { + super(settings); + findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); + this.transportService = transportService; + this.futureExecutor = futureExecutor; + this.transportAddressConnector = transportAddressConnector; + this.configuredHostsResolver = configuredHostsResolver; + + transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false, + PeersRequest::new, + (request, channel, task) -> channel.sendResponse(handlePeersRequest(request))); + } + + public void activate(final DiscoveryNodes lastAcceptedNodes) { + logger.trace("activating with {}", lastAcceptedNodes); + + synchronized (mutex) { + assert active == false; + active = true; + this.lastAcceptedNodes = lastAcceptedNodes; + leader = Optional.empty(); + handleWakeUp(); + } + } + + public void deactivate(DiscoveryNode leader) { + synchronized (mutex) { + logger.trace("deactivating and setting leader to {}", leader); + active = false; + handleWakeUp(); + this.leader = Optional.of(leader); + assert assertInactiveWithNoKnownPeers(); + } + } + + // exposed to subclasses for testing + protected final boolean holdsLock() { + return Thread.holdsLock(mutex); + } + + boolean assertInactiveWithNoKnownPeers() { + assert active == false; + assert peersByAddress.isEmpty(); + return true; + } + + PeersResponse handlePeersRequest(PeersRequest peersRequest) { + synchronized (mutex) { + assert peersRequest.getSourceNode().equals(getLocalNode()) == false; + if (active) { + startProbe(peersRequest.getSourceNode().getAddress()); + peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe); + return new PeersResponse(Optional.empty(), getFoundPeersUnderLock(), currentTerm); + } else { + return new PeersResponse(leader, Collections.emptyList(), currentTerm); + } + } + } + + public void setCurrentTerm(long currentTerm) { + this.currentTerm = currentTerm; + } + + private DiscoveryNode getLocalNode() { + final DiscoveryNode localNode = transportService.getLocalNode(); + assert localNode != null; + return localNode; + } + + /** + * Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join. + */ + protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term); + + public interface TransportAddressConnector { + /** + * Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it. + */ + void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener); + } + + public interface ConfiguredHostsResolver { + /** + * Attempt to resolve the configured unicast hosts list to a list of transport addresses. + * + * @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in + * progress. + */ + void resolveConfiguredHosts(Consumer> consumer); + } + + public Iterable getFoundPeers() { + synchronized (mutex) { + return getFoundPeersUnderLock(); + } + } + + private List getFoundPeersUnderLock() { + assert active; + assert holdsLock() : "PeerFinder mutex not held"; + return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList()); + } + + private Peer createConnectingPeer(TransportAddress transportAddress) { + Peer peer = new Peer(transportAddress); + peer.establishConnection(); + return peer; + } + + private void handleWakeUp() { + assert holdsLock() : "PeerFinder mutex not held"; + + for (final Peer peer : peersByAddress.values()) { + peer.handleWakeUp(); + } + + if (active == false) { + logger.trace("not active"); + return; + } + + logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes); + for (ObjectCursor discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) { + startProbe(discoveryNodeObjectCursor.value.getAddress()); + } + + configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> { + synchronized (mutex) { + logger.trace("probing resolved transport addresses {}", providedAddresses); + providedAddresses.forEach(this::startProbe); + } + }); + + futureExecutor.schedule(new AbstractRunnable() { + @Override + public boolean isForceExecution() { + return true; + } + + @Override + public void onFailure(Exception e) { + assert false : e; + logger.debug("unexpected exception in wakeup", e); + } + + @Override + protected void doRun() { + synchronized (mutex) { + handleWakeUp(); + } + } + + @Override + public String toString() { + return "PeerFinder::handleWakeUp"; + } + }, findPeersDelay); + } + + private void startProbe(TransportAddress transportAddress) { + assert holdsLock() : "PeerFinder mutex not held"; + if (active == false) { + logger.trace("startProbe({}) not running", transportAddress); + return; + } + + if (transportAddress.equals(getLocalNode().getAddress())) { + logger.trace("startProbe({}) not probing local node", transportAddress); + return; + } + + peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer); + } + + private class Peer { + private final TransportAddress transportAddress; + private SetOnce discoveryNode = new SetOnce<>(); + private volatile boolean peersRequestInFlight; + + Peer(TransportAddress transportAddress) { + this.transportAddress = transportAddress; + } + + @Nullable + DiscoveryNode getDiscoveryNode() { + return discoveryNode.get(); + } + + void handleWakeUp() { + assert holdsLock() : "PeerFinder mutex not held"; + + if (active == false) { + removePeer(); + return; + } + + final DiscoveryNode discoveryNode = getDiscoveryNode(); + // may be null if connection not yet established + + if (discoveryNode != null) { + if (transportService.nodeConnected(discoveryNode)) { + if (peersRequestInFlight == false) { + requestPeers(); + } + } else { + logger.trace("{} no longer connected", this); + removePeer(); + } + } + } + + void establishConnection() { + assert holdsLock() : "PeerFinder mutex not held"; + assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode(); + assert active; + + logger.trace("{} attempting connection", this); + transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener() { + @Override + public void onResponse(DiscoveryNode remoteNode) { + assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible"; + assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node"; + synchronized (mutex) { + if (active) { + assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get(); + discoveryNode.set(remoteNode); + requestPeers(); + } + } + } + + @Override + public void onFailure(Exception e) { + logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e); + removePeer(); + } + }); + } + + private void removePeer() { + final Peer removed = peersByAddress.remove(transportAddress); + assert removed == Peer.this; + } + + private void requestPeers() { + assert holdsLock() : "PeerFinder mutex not held"; + assert peersRequestInFlight == false : "PeersRequest already in flight"; + assert active; + + final DiscoveryNode discoveryNode = getDiscoveryNode(); + assert discoveryNode != null : "cannot request peers without first connecting"; + + logger.trace("{} requesting peers", this); + peersRequestInFlight = true; + + List knownNodes = getFoundPeersUnderLock(); + + transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME, + new PeersRequest(getLocalNode(), knownNodes), + new TransportResponseHandler() { + + @Override + public PeersResponse read(StreamInput in) throws IOException { + return new PeersResponse(in); + } + + @Override + public void handleResponse(PeersResponse response) { + logger.trace("{} received {}", Peer.this, response); + synchronized (mutex) { + if (active == false) { + return; + } + + peersRequestInFlight = false; + + response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe); + response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe); + } + + if (response.getMasterNode().equals(Optional.of(discoveryNode))) { + // Must not hold lock here to avoid deadlock + assert holdsLock() == false : "PeerFinder mutex is held in error"; + onActiveMasterFound(discoveryNode, response.getTerm()); + } + } + + @Override + public void handleException(TransportException exp) { + peersRequestInFlight = false; + logger.debug(new ParameterizedMessage("{} peers request failed", this), exp); + } + + @Override + public String executor() { + return Names.GENERIC; + } + }); + } + + @Override + public String toString() { + return "Peer{" + + "transportAddress=" + transportAddress + + ", discoveryNode=" + discoveryNode.get() + + ", peersRequestInFlight=" + peersRequestInFlight + + '}'; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/discovery/PeersRequest.java b/server/src/main/java/org/elasticsearch/discovery/PeersRequest.java new file mode 100644 index 00000000000..346cc098f75 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/discovery/PeersRequest.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class PeersRequest extends TransportRequest { + private final DiscoveryNode sourceNode; + private final List knownPeers; + + public PeersRequest(DiscoveryNode sourceNode, List knownPeers) { + assert knownPeers.contains(sourceNode) == false : "local node is not a peer"; + this.sourceNode = sourceNode; + this.knownPeers = knownPeers; + } + + public PeersRequest(StreamInput in) throws IOException { + super(in); + sourceNode = new DiscoveryNode(in); + knownPeers = in.readList(DiscoveryNode::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeList(knownPeers); + } + + public List getKnownPeers() { + return knownPeers; + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + @Override + public String toString() { + return "PeersRequest{" + + "sourceNode=" + sourceNode + + ", knownPeers=" + knownPeers + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PeersRequest that = (PeersRequest) o; + return Objects.equals(sourceNode, that.sourceNode) && + Objects.equals(knownPeers, that.knownPeers); + } + + @Override + public int hashCode() { + return Objects.hash(sourceNode, knownPeers); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 6afdf81a8b4..904dc0f9687 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -24,7 +24,6 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.threadpool.ThreadPoolStats; @@ -51,6 +50,29 @@ public class DeterministicTaskQueue extends AbstractComponent { super(settings); } + public void runAllTasks() { + while (true) { + runAllRunnableTasks(); + if (hasDeferredTasks()) { + advanceTime(); + } else { + break; + } + } + } + + public void runAllRunnableTasks() { + while (hasRunnableTasks()) { + runNextTask(); + } + } + + public void runAllRunnableTasks(Random random) { + while (hasRunnableTasks()) { + runRandomTask(random); + } + } + /** * @return whether there are any runnable tasks. */ @@ -318,16 +340,6 @@ public class DeterministicTaskQueue extends AbstractComponent { public ScheduledExecutorService scheduler() { throw new UnsupportedOperationException(); } - - @Override - public void close() { - throw new UnsupportedOperationException(); - } - - @Override - public ThreadContext getThreadContext() { - throw new UnsupportedOperationException(); - } }; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index adba2bcc8a0..2097075b32d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -281,7 +281,7 @@ public class DeterministicTaskQueueTests extends ESTestCase { futureExecutor.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE); - runAllTasks(taskQueue); + taskQueue.runAllTasks(); assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis)); assertThat(strings, contains("runnable", "also runnable", "deferred")); @@ -295,22 +295,10 @@ public class DeterministicTaskQueueTests extends ESTestCase { assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); - runAllTasks(taskQueue); + taskQueue.runAllTasks(); assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1)); - assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); - } - private static void runAllTasks(DeterministicTaskQueue taskQueue) { - while (true) { - while (taskQueue.hasRunnableTasks()) { - taskQueue.runNextTask(); - } - if (taskQueue.hasDeferredTasks()) { - taskQueue.advanceTime(); - } else { - break; - } - } + assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); } private static DeterministicTaskQueue newTaskQueue() { diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderMessagesTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderMessagesTests.java new file mode 100644 index 00000000000..aabb77b67ee --- /dev/null +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderMessagesTests.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.coordination.PeersResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + +public class PeerFinderMessagesTests extends ESTestCase { + private DiscoveryNode createNode(String id) { + return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); + } + + public void testPeersRequestEqualsHashCodeSerialization() { + final PeersRequest initialPeersRequest = new PeersRequest(createNode(randomAlphaOfLength(10)), + Arrays.stream(generateRandomStringArray(10, 10, false)).map(this::createNode).collect(Collectors.toList())); + + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersRequest, + publishRequest -> copyWriteable(publishRequest, writableRegistry(), PeersRequest::new), + in -> { + final List discoveryNodes = new ArrayList<>(in.getKnownPeers()); + if (randomBoolean()) { + return new PeersRequest(createNode(randomAlphaOfLength(10)), discoveryNodes); + } else { + return new PeersRequest(in.getSourceNode(), modifyDiscoveryNodesList(in.getKnownPeers(), true)); + } + }); + } + + public void testPeersResponseEqualsHashCodeSerialization() { + final long initialTerm = randomNonNegativeLong(); + final PeersResponse initialPeersResponse; + + if (randomBoolean()) { + initialPeersResponse = new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), initialTerm); + } else { + initialPeersResponse = new PeersResponse(Optional.empty(), + Arrays.stream(generateRandomStringArray(10, 10, false, false)).map(this::createNode).collect(Collectors.toList()), + initialTerm); + } + + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersResponse, + publishResponse -> copyWriteable(publishResponse, writableRegistry(), PeersResponse::new), + in -> { + final long term = in.getTerm(); + if (randomBoolean()) { + return new PeersResponse(in.getMasterNode(), in.getKnownPeers(), + randomValueOtherThan(term, ESTestCase::randomNonNegativeLong)); + } else { + if (in.getMasterNode().isPresent()) { + if (randomBoolean()) { + return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), in.getKnownPeers(), term); + } else { + return new PeersResponse(Optional.empty(), singletonList(createNode(randomAlphaOfLength(10))), term); + } + } else { + if (randomBoolean()) { + return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), term); + } else { + return new PeersResponse(in.getMasterNode(), modifyDiscoveryNodesList(in.getKnownPeers(), false), term); + } + } + } + }); + } + + + private List modifyDiscoveryNodesList(Collection originalNodes, boolean allowEmpty) { + final List discoveryNodes = new ArrayList<>(originalNodes); + if (discoveryNodes.isEmpty() == false && randomBoolean() && (allowEmpty || discoveryNodes.size() > 1)) { + discoveryNodes.remove(randomIntBetween(0, discoveryNodes.size() - 1)); + } else if (discoveryNodes.isEmpty() == false && randomBoolean()) { + discoveryNodes.set(randomIntBetween(0, discoveryNodes.size() - 1), createNode(randomAlphaOfLength(10))); + } else { + discoveryNodes.add(createNode(randomAlphaOfLength(10))); + } + return discoveryNodes; + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java new file mode 100644 index 00000000000..c3f5c87d9d9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -0,0 +1,703 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.cluster.coordination.FutureExecutor; +import org.elasticsearch.cluster.coordination.PeersResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.emptyArray; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class PeerFinderTests extends ESTestCase { + + private CapturingTransport capturingTransport; + private DeterministicTaskQueue deterministicTaskQueue; + private DiscoveryNode localNode; + private MockTransportAddressConnector transportAddressConnector; + private TestPeerFinder peerFinder; + private List providedAddresses; + private long addressResolveDelay; // -1 means address resolution fails + + private Set disconnectedNodes = new HashSet<>(); + private Set connectedNodes = new HashSet<>(); + private DiscoveryNodes lastAcceptedNodes; + private TransportService transportService; + + private static long CONNECTION_TIMEOUT_MILLIS = 30000; + + class MockTransportAddressConnector implements TransportAddressConnector { + final Set reachableNodes = new HashSet<>(); + final Set unreachableAddresses = new HashSet<>(); + final Set slowAddresses = new HashSet<>(); + final Set inFlightConnectionAttempts = new HashSet<>(); + + @Override + public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { + assert localNode.getAddress().equals(transportAddress) == false : "should not probe local node"; + + final boolean isNotInFlight = inFlightConnectionAttempts.add(transportAddress); + assertTrue(isNotInFlight); + + final long connectResultTime = deterministicTaskQueue.getCurrentTimeMillis() + + (slowAddresses.contains(transportAddress) ? CONNECTION_TIMEOUT_MILLIS : 0); + + deterministicTaskQueue.scheduleAt(connectResultTime, new Runnable() { + @Override + public void run() { + if (unreachableAddresses.contains(transportAddress)) { + assertTrue(inFlightConnectionAttempts.remove(transportAddress)); + listener.onFailure(new IOException("cannot connect to " + transportAddress)); + return; + } + + for (final DiscoveryNode discoveryNode : reachableNodes) { + if (discoveryNode.getAddress().equals(transportAddress)) { + if (discoveryNode.isMasterNode()) { + disconnectedNodes.remove(discoveryNode); + connectedNodes.add(discoveryNode); + assertTrue(inFlightConnectionAttempts.remove(transportAddress)); + listener.onResponse(discoveryNode); + return; + } else { + listener.onFailure(new ElasticsearchException("non-master node " + discoveryNode)); + return; + } + } + } + + throw new AssertionError(transportAddress + " unknown"); + } + + @Override + public String toString() { + return "connection attempt to " + transportAddress; + } + }); + } + } + + class TestPeerFinder extends PeerFinder { + DiscoveryNode discoveredMasterNode; + OptionalLong discoveredMasterTerm = OptionalLong.empty(); + + TestPeerFinder(Settings settings, TransportService transportService, FutureExecutor futureExecutor, + TransportAddressConnector transportAddressConnector) { + super(settings, transportService, futureExecutor, transportAddressConnector, PeerFinderTests.this::resolveConfiguredHosts); + } + + @Override + protected void onActiveMasterFound(DiscoveryNode masterNode, long term) { + assert holdsLock() == false : "PeerFinder lock held in error"; + assertThat(discoveredMasterNode, nullValue()); + assertFalse(discoveredMasterTerm.isPresent()); + discoveredMasterNode = masterNode; + discoveredMasterTerm = OptionalLong.of(term); + } + } + + private void resolveConfiguredHosts(Consumer> onResult) { + if (addressResolveDelay >= 0) { + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + addressResolveDelay, new Runnable() { + @Override + public void run() { + onResult.accept(providedAddresses); + } + + @Override + public String toString() { + return "PeerFinderTests#resolveConfiguredHosts"; + } + }); + } else { + assertThat(addressResolveDelay, is(-1L)); + } + } + + private void updateLastAcceptedNodes(Consumer onBuilder) { + final Builder builder = DiscoveryNodes.builder(lastAcceptedNodes); + onBuilder.accept(builder); + lastAcceptedNodes = builder.build(); + } + + @Before + public void setup() { + capturingTransport = new CapturingTransport() { + @Override + public boolean nodeConnected(DiscoveryNode node) { + final boolean isConnected = connectedNodes.contains(node); + final boolean isDisconnected = disconnectedNodes.contains(node); + assert isConnected != isDisconnected : node + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected; + return isConnected; + } + }; + transportAddressConnector = new MockTransportAddressConnector(); + providedAddresses = new ArrayList<>(); + addressResolveDelay = 0L; + + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + deterministicTaskQueue = new DeterministicTaskQueue(settings); + + localNode = newDiscoveryNode("local-node"); + transportService = new TransportService(settings, capturingTransport, + deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + lastAcceptedNodes = DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build(); + + peerFinder = new TestPeerFinder(settings, transportService, + deterministicTaskQueue.getFutureExecutor(), transportAddressConnector); + } + + @After + public void deactivateAndRunRemainingTasks() { + peerFinder.deactivate(localNode); + deterministicTaskQueue.runAllTasks(); // termination ensures that everything is properly cleaned up + peerFinder.assertInactiveWithNoKnownPeers(); // should eventually have no nodes when deactivated + } + + public void testAddsReachableNodesFromUnicastHostsList() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + } + + public void testAddsReachableNodesFromUnicastHostsListProvidedLater() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + addressResolveDelay = 10000; + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + assertFoundPeers(); + + final long successTime = addressResolveDelay + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + while (deterministicTaskQueue.getCurrentTimeMillis() < successTime) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + assertFoundPeers(otherNode); + } + + public void testDoesNotRequireAddressResolutionToSucceed() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + addressResolveDelay = -1; + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + assertFoundPeers(); + + final long successTime = 10000 + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + while (deterministicTaskQueue.getCurrentTimeMillis() < successTime) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + assertFoundPeers(); + } + + public void testDoesNotAddUnreachableNodesFromUnicastHostsList() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.unreachableAddresses.add(otherNode.getAddress()); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(); + } + + public void testDoesNotAddNonMasterEligibleNodesFromUnicastHostsList() { + final DiscoveryNode nonMasterNode = new DiscoveryNode("node-from-hosts-list", buildNewFakeTransportAddress(), + emptyMap(), emptySet(), Version.CURRENT); + + providedAddresses.add(nonMasterNode.getAddress()); + transportAddressConnector.reachableNodes.add(nonMasterNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(); + + assertThat(capturingTransport.capturedRequests(), emptyArray()); + } + + public void testChecksUnicastHostsForChanges() { + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + assertFoundPeers(); + + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + } + + public void testDeactivationClearsPastKnowledge() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + + peerFinder.deactivate(localNode); + + providedAddresses.clear(); + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + assertFoundPeers(); + } + + public void testAddsReachableNodesFromClusterState() { + final DiscoveryNode otherNode = newDiscoveryNode("node-in-cluster-state"); + updateLastAcceptedNodes(b -> b.add(otherNode)); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + } + + public void testDoesNotAddUnreachableNodesFromClusterState() { + final DiscoveryNode otherNode = newDiscoveryNode("node-in-cluster-state"); + updateLastAcceptedNodes(b -> b.add(otherNode)); + transportAddressConnector.unreachableAddresses.add(otherNode.getAddress()); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + assertFoundPeers(); + } + + public void testAddsReachableNodesFromIncomingRequests() { + final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); + final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node"); + + transportAddressConnector.reachableNodes.add(sourceNode); + transportAddressConnector.reachableNodes.add(otherKnownNode); + + peerFinder.activate(lastAcceptedNodes); + peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode))); + runAllRunnableTasks(); + + assertFoundPeers(sourceNode, otherKnownNode); + } + + public void testDoesNotAddUnreachableNodesFromIncomingRequests() { + final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); + final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node"); + + transportAddressConnector.reachableNodes.add(sourceNode); + transportAddressConnector.unreachableAddresses.add(otherKnownNode.getAddress()); + + peerFinder.activate(lastAcceptedNodes); + peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode))); + runAllRunnableTasks(); + + assertFoundPeers(sourceNode); + } + + public void testDoesNotAddUnreachableSourceNodeFromIncomingRequests() { + final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); + final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node"); + + transportAddressConnector.unreachableAddresses.add(sourceNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherKnownNode); + + peerFinder.activate(lastAcceptedNodes); + peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode))); + runAllRunnableTasks(); + + assertFoundPeers(otherKnownNode); + } + + public void testRespondsToRequestWhenActive() { + final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); + + transportAddressConnector.reachableNodes.add(sourceNode); + + peerFinder.activate(lastAcceptedNodes); + final PeersResponse peersResponse1 = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList())); + assertFalse(peersResponse1.getMasterNode().isPresent()); + assertThat(peersResponse1.getKnownPeers(), empty()); // sourceNode is not yet known + assertThat(peersResponse1.getTerm(), is(0L)); + + runAllRunnableTasks(); + + assertFoundPeers(sourceNode); + + final long updatedTerm = randomNonNegativeLong(); + peerFinder.setCurrentTerm(updatedTerm); + final PeersResponse peersResponse2 = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList())); + assertFalse(peersResponse2.getMasterNode().isPresent()); + assertThat(peersResponse2.getKnownPeers(), contains(sourceNode)); + assertThat(peersResponse2.getTerm(), is(updatedTerm)); + } + + public void testDelegatesRequestHandlingWhenInactive() { + final DiscoveryNode masterNode = newDiscoveryNode("master-node"); + final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); + transportAddressConnector.reachableNodes.add(sourceNode); + + peerFinder.activate(DiscoveryNodes.EMPTY_NODES); + + final long term = randomNonNegativeLong(); + peerFinder.setCurrentTerm(term); + peerFinder.deactivate(masterNode); + + final PeersResponse expectedResponse = new PeersResponse(Optional.of(masterNode), Collections.emptyList(), term); + final PeersResponse peersResponse = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList())); + assertThat(peersResponse, equalTo(expectedResponse)); + } + + public void testReceivesRequestsFromTransportService() { + final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); + + transportAddressConnector.reachableNodes.add(sourceNode); + + peerFinder.activate(lastAcceptedNodes); + + final AtomicBoolean responseReceived = new AtomicBoolean(); + + transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(sourceNode, Collections.emptyList()), + new TransportResponseHandler() { + @Override + public void handleResponse(PeersResponse response) { + assertTrue(responseReceived.compareAndSet(false, true)); + assertFalse(response.getMasterNode().isPresent()); + assertThat(response.getKnownPeers(), empty()); // sourceNode is not yet known + assertThat(response.getTerm(), is(0L)); + } + + @Override + public void handleException(TransportException exp) { + throw new AssertionError("unexpected", exp); + } + + @Override + public String executor() { + return Names.SAME; + } + }); + + runAllRunnableTasks(); + assertTrue(responseReceived.get()); + assertFoundPeers(sourceNode); + } + + public void testRequestsPeersIncludingKnownPeersInRequest() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + + final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, is(1)); + final PeersRequest peersRequest = (PeersRequest) capturedRequests[0].request; + assertThat(peersRequest.getKnownPeers(), contains(otherNode)); + } + + public void testAddsReachablePeersFromResponse() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + + final DiscoveryNode discoveredNode = newDiscoveryNode("discovered-node"); + transportAddressConnector.reachableNodes.add(discoveredNode); + respondToRequests(node -> { + assertThat(node, is(otherNode)); + return new PeersResponse(Optional.empty(), singletonList(discoveredNode), randomNonNegativeLong()); + }); + + runAllRunnableTasks(); + assertFoundPeers(otherNode, discoveredNode); + } + + public void testAddsReachableMasterFromResponse() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + final DiscoveryNode discoveredMaster = newDiscoveryNode("discovered-master"); + + respondToRequests(node -> { + assertThat(node, is(otherNode)); + return new PeersResponse(Optional.of(discoveredMaster), emptyList(), randomNonNegativeLong()); + }); + + transportAddressConnector.reachableNodes.add(discoveredMaster); + runAllRunnableTasks(); + assertFoundPeers(otherNode, discoveredMaster); + assertThat(peerFinder.discoveredMasterNode, nullValue()); + assertFalse(peerFinder.discoveredMasterTerm.isPresent()); + } + + public void testHandlesDiscoveryOfMasterFromResponseFromMaster() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + + final long term = randomNonNegativeLong(); + respondToRequests(node -> { + assertThat(node, is(otherNode)); + return new PeersResponse(Optional.of(otherNode), emptyList(), term); + }); + + runAllRunnableTasks(); + assertFoundPeers(otherNode); + assertThat(peerFinder.discoveredMasterNode, is(otherNode)); + assertThat(peerFinder.discoveredMasterTerm, is(OptionalLong.of(term))); + } + + public void testOnlyRequestsPeersOncePerRoundButDoesRetryNextRound() { + final DiscoveryNode sourceNode = newDiscoveryNode("request-source"); + transportAddressConnector.reachableNodes.add(sourceNode); + + peerFinder.activate(lastAcceptedNodes); + peerFinder.handlePeersRequest(new PeersRequest(sourceNode, emptyList())); + runAllRunnableTasks(); + assertFoundPeers(sourceNode); + + respondToRequests(node -> { + assertThat(node, is(sourceNode)); + return new PeersResponse(Optional.empty(), singletonList(sourceNode), randomNonNegativeLong()); + }); + + peerFinder.handlePeersRequest(new PeersRequest(sourceNode, emptyList())); + runAllRunnableTasks(); + respondToRequests(node -> { + throw new AssertionError("there should have been no further requests"); + }); + + final DiscoveryNode otherNode = newDiscoveryNode("otherNode"); + transportAddressConnector.reachableNodes.add(otherNode); + + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + respondToRequests(node -> { + assertThat(node, is(sourceNode)); + return new PeersResponse(Optional.empty(), singletonList(otherNode), randomNonNegativeLong()); + }); + runAllRunnableTasks(); + assertFoundPeers(sourceNode, otherNode); + } + + public void testDoesNotReconnectToNodesOnceConnected() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + + transportAddressConnector.reachableNodes.clear(); + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + } + + public void testDiscardsDisconnectedNodes() { + final DiscoveryNode otherNode = newDiscoveryNode("original-node"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + + transportAddressConnector.reachableNodes.clear(); + transportAddressConnector.unreachableAddresses.add(otherNode.getAddress()); + connectedNodes.remove(otherNode); + disconnectedNodes.add(otherNode); + + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + assertFoundPeers(); + } + + public void testDoesNotMakeMultipleConcurrentConnectionAttemptsToOneAddress() { + final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.unreachableAddresses.add(otherNode.getAddress()); + transportAddressConnector.slowAddresses.add(otherNode.getAddress()); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + assertFoundPeers(); + + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); // MockTransportAddressConnector verifies no multiple connection attempts + assertFoundPeers(); + + transportAddressConnector.slowAddresses.clear(); + transportAddressConnector.unreachableAddresses.clear(); + transportAddressConnector.reachableNodes.add(otherNode); + + while (deterministicTaskQueue.getCurrentTimeMillis() < CONNECTION_TIMEOUT_MILLIS) { + assertFoundPeers(); + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + // need to wait for the connection to timeout, then for another wakeup, before discovering the peer + final long expectedTime = CONNECTION_TIMEOUT_MILLIS + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + + while (deterministicTaskQueue.getCurrentTimeMillis() < expectedTime) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + assertFoundPeers(otherNode); + } + + public void testReconnectsToDisconnectedNodes() { + final DiscoveryNode otherNode = newDiscoveryNode("original-node"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.reachableNodes.add(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + + assertFoundPeers(otherNode); + + transportAddressConnector.reachableNodes.clear(); + final DiscoveryNode rebootedOtherNode = new DiscoveryNode("rebooted-node", otherNode.getAddress(), Version.CURRENT); + transportAddressConnector.reachableNodes.add(rebootedOtherNode); + + connectedNodes.remove(otherNode); + disconnectedNodes.add(otherNode); + + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + + assertFoundPeers(rebootedOtherNode); + } + + private void respondToRequests(Function responseFactory) { + final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear(); + for (final CapturedRequest capturedRequest : capturedRequests) { + assertThat(capturedRequest.action, is(REQUEST_PEERS_ACTION_NAME)); + assertThat(capturedRequest.request, instanceOf(PeersRequest.class)); + final PeersRequest peersRequest = (PeersRequest) capturedRequest.request; + assertThat(peersRequest.getSourceNode(), is(localNode)); + capturingTransport.handleResponse(capturedRequests[0].requestId, responseFactory.apply(capturedRequest.node)); + } + } + + private void assertFoundPeers(DiscoveryNode... expectedNodesArray) { + final Stream expectedNodes = Arrays.stream(expectedNodesArray); + final Stream actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false); + assertThat(actualNodes.collect(Collectors.toSet()), equalTo(expectedNodes.collect(Collectors.toSet()))); + } + + private DiscoveryNode newDiscoveryNode(String nodeId) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), Version.CURRENT); + } + + private void runAllRunnableTasks() { + deterministicTaskQueue.runAllRunnableTasks(random()); + } +} +