[Zen2] Introduce gossip-like discovery of master nodes (#32246)

This commit introduces the `PeerFinder` which can be used to collect the
identities of the master-eligible nodes in a masterless cluster, based on the
`UnicastHostsProvider`, the nodes in the `ClusterState`, and nodes that other
nodes have discovered.
This commit is contained in:
David Turner 2018-08-06 15:26:31 +01:00 committed by GitHub
parent d80b639c18
commit 2176184db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1415 additions and 27 deletions

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
public class PeersResponse extends TransportResponse {
private final Optional<DiscoveryNode> masterNode;
private final List<DiscoveryNode> knownPeers;
private final long term;
public PeersResponse(Optional<DiscoveryNode> masterNode, List<DiscoveryNode> knownPeers, long term) {
assert masterNode.isPresent() == false || knownPeers.isEmpty();
this.masterNode = masterNode;
this.knownPeers = knownPeers;
this.term = term;
}
public PeersResponse(StreamInput in) throws IOException {
masterNode = Optional.ofNullable(in.readOptionalWriteable(DiscoveryNode::new));
knownPeers = in.readList(DiscoveryNode::new);
term = in.readLong();
assert masterNode.isPresent() == false || knownPeers.isEmpty();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(masterNode.orElse(null));
out.writeList(knownPeers);
out.writeLong(term);
}
/**
* @return the node that is currently leading, according to the responding node.
*/
public Optional<DiscoveryNode> getMasterNode() {
return masterNode;
}
/**
* @return the collection of known peers of the responding node, or an empty collection if the responding node believes there
* is currently a leader.
*/
public List<DiscoveryNode> getKnownPeers() {
return knownPeers;
}
/**
* @return the current term of the responding node. If the responding node is the leader then this is the term in which it is
* currently leading.
*/
public long getTerm() {
return term;
}
@Override
public String toString() {
return "PeersResponse{" +
"masterNode=" + masterNode +
", knownPeers=" + knownPeers +
", term=" + term +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PeersResponse that = (PeersResponse) o;
return term == that.term &&
Objects.equals(masterNode, that.masterNode) &&
Objects.equals(knownPeers, that.knownPeers);
}
@Override
public int hashCode() {
return Objects.hash(masterNode, knownPeers, term);
}
}

View File

@ -54,6 +54,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
@ -423,6 +424,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
Node.BREAKER_TYPE_KEY,
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING
)));
}

View File

@ -0,0 +1,387 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.FutureExecutor;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
public abstract class PeerFinder extends AbstractComponent {
public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers";
// the time between attempts to find all peers
public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_SETTING =
Setting.timeSetting("discovery.find_peers_interval",
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
private final TimeValue findPeersDelay;
private final Object mutex = new Object();
private final TransportService transportService;
private final FutureExecutor futureExecutor;
private final TransportAddressConnector transportAddressConnector;
private final ConfiguredHostsResolver configuredHostsResolver;
private volatile long currentTerm;
private boolean active;
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();
PeerFinder(Settings settings, TransportService transportService, FutureExecutor futureExecutor,
TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) {
super(settings);
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
this.transportService = transportService;
this.futureExecutor = futureExecutor;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;
transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
PeersRequest::new,
(request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));
}
public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating with {}", lastAcceptedNodes);
synchronized (mutex) {
assert active == false;
active = true;
this.lastAcceptedNodes = lastAcceptedNodes;
leader = Optional.empty();
handleWakeUp();
}
}
public void deactivate(DiscoveryNode leader) {
synchronized (mutex) {
logger.trace("deactivating and setting leader to {}", leader);
active = false;
handleWakeUp();
this.leader = Optional.of(leader);
assert assertInactiveWithNoKnownPeers();
}
}
// exposed to subclasses for testing
protected final boolean holdsLock() {
return Thread.holdsLock(mutex);
}
boolean assertInactiveWithNoKnownPeers() {
assert active == false;
assert peersByAddress.isEmpty();
return true;
}
PeersResponse handlePeersRequest(PeersRequest peersRequest) {
synchronized (mutex) {
assert peersRequest.getSourceNode().equals(getLocalNode()) == false;
if (active) {
startProbe(peersRequest.getSourceNode().getAddress());
peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe);
return new PeersResponse(Optional.empty(), getFoundPeersUnderLock(), currentTerm);
} else {
return new PeersResponse(leader, Collections.emptyList(), currentTerm);
}
}
}
public void setCurrentTerm(long currentTerm) {
this.currentTerm = currentTerm;
}
private DiscoveryNode getLocalNode() {
final DiscoveryNode localNode = transportService.getLocalNode();
assert localNode != null;
return localNode;
}
/**
* Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
*/
protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term);
public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
*/
void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener);
}
public interface ConfiguredHostsResolver {
/**
* Attempt to resolve the configured unicast hosts list to a list of transport addresses.
*
* @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in
* progress.
*/
void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer);
}
public Iterable<DiscoveryNode> getFoundPeers() {
synchronized (mutex) {
return getFoundPeersUnderLock();
}
}
private List<DiscoveryNode> getFoundPeersUnderLock() {
assert active;
assert holdsLock() : "PeerFinder mutex not held";
return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList());
}
private Peer createConnectingPeer(TransportAddress transportAddress) {
Peer peer = new Peer(transportAddress);
peer.establishConnection();
return peer;
}
private void handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";
for (final Peer peer : peersByAddress.values()) {
peer.handleWakeUp();
}
if (active == false) {
logger.trace("not active");
return;
}
logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes);
for (ObjectCursor<DiscoveryNode> discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) {
startProbe(discoveryNodeObjectCursor.value.getAddress());
}
configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {
synchronized (mutex) {
logger.trace("probing resolved transport addresses {}", providedAddresses);
providedAddresses.forEach(this::startProbe);
}
});
futureExecutor.schedule(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return true;
}
@Override
public void onFailure(Exception e) {
assert false : e;
logger.debug("unexpected exception in wakeup", e);
}
@Override
protected void doRun() {
synchronized (mutex) {
handleWakeUp();
}
}
@Override
public String toString() {
return "PeerFinder::handleWakeUp";
}
}, findPeersDelay);
}
private void startProbe(TransportAddress transportAddress) {
assert holdsLock() : "PeerFinder mutex not held";
if (active == false) {
logger.trace("startProbe({}) not running", transportAddress);
return;
}
if (transportAddress.equals(getLocalNode().getAddress())) {
logger.trace("startProbe({}) not probing local node", transportAddress);
return;
}
peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer);
}
private class Peer {
private final TransportAddress transportAddress;
private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();
private volatile boolean peersRequestInFlight;
Peer(TransportAddress transportAddress) {
this.transportAddress = transportAddress;
}
@Nullable
DiscoveryNode getDiscoveryNode() {
return discoveryNode.get();
}
void handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";
if (active == false) {
removePeer();
return;
}
final DiscoveryNode discoveryNode = getDiscoveryNode();
// may be null if connection not yet established
if (discoveryNode != null) {
if (transportService.nodeConnected(discoveryNode)) {
if (peersRequestInFlight == false) {
requestPeers();
}
} else {
logger.trace("{} no longer connected", this);
removePeer();
}
}
}
void establishConnection() {
assert holdsLock() : "PeerFinder mutex not held";
assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode();
assert active;
logger.trace("{} attempting connection", this);
transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() {
@Override
public void onResponse(DiscoveryNode remoteNode) {
assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible";
assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node";
synchronized (mutex) {
if (active) {
assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
discoveryNode.set(remoteNode);
requestPeers();
}
}
}
@Override
public void onFailure(Exception e) {
logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e);
removePeer();
}
});
}
private void removePeer() {
final Peer removed = peersByAddress.remove(transportAddress);
assert removed == Peer.this;
}
private void requestPeers() {
assert holdsLock() : "PeerFinder mutex not held";
assert peersRequestInFlight == false : "PeersRequest already in flight";
assert active;
final DiscoveryNode discoveryNode = getDiscoveryNode();
assert discoveryNode != null : "cannot request peers without first connecting";
logger.trace("{} requesting peers", this);
peersRequestInFlight = true;
List<DiscoveryNode> knownNodes = getFoundPeersUnderLock();
transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME,
new PeersRequest(getLocalNode(), knownNodes),
new TransportResponseHandler<PeersResponse>() {
@Override
public PeersResponse read(StreamInput in) throws IOException {
return new PeersResponse(in);
}
@Override
public void handleResponse(PeersResponse response) {
logger.trace("{} received {}", Peer.this, response);
synchronized (mutex) {
if (active == false) {
return;
}
peersRequestInFlight = false;
response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe);
response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe);
}
if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
// Must not hold lock here to avoid deadlock
assert holdsLock() == false : "PeerFinder mutex is held in error";
onActiveMasterFound(discoveryNode, response.getTerm());
}
}
@Override
public void handleException(TransportException exp) {
peersRequestInFlight = false;
logger.debug(new ParameterizedMessage("{} peers request failed", this), exp);
}
@Override
public String executor() {
return Names.GENERIC;
}
});
}
@Override
public String toString() {
return "Peer{" +
"transportAddress=" + transportAddress +
", discoveryNode=" + discoveryNode.get() +
", peersRequestInFlight=" + peersRequestInFlight +
'}';
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class PeersRequest extends TransportRequest {
private final DiscoveryNode sourceNode;
private final List<DiscoveryNode> knownPeers;
public PeersRequest(DiscoveryNode sourceNode, List<DiscoveryNode> knownPeers) {
assert knownPeers.contains(sourceNode) == false : "local node is not a peer";
this.sourceNode = sourceNode;
this.knownPeers = knownPeers;
}
public PeersRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
knownPeers = in.readList(DiscoveryNode::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
out.writeList(knownPeers);
}
public List<DiscoveryNode> getKnownPeers() {
return knownPeers;
}
public DiscoveryNode getSourceNode() {
return sourceNode;
}
@Override
public String toString() {
return "PeersRequest{" +
"sourceNode=" + sourceNode +
", knownPeers=" + knownPeers +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PeersRequest that = (PeersRequest) o;
return Objects.equals(sourceNode, that.sourceNode) &&
Objects.equals(knownPeers, that.knownPeers);
}
@Override
public int hashCode() {
return Objects.hash(sourceNode, knownPeers);
}
}

View File

@ -24,7 +24,6 @@ import org.apache.lucene.util.Counter;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.threadpool.ThreadPoolStats;
@ -51,6 +50,29 @@ public class DeterministicTaskQueue extends AbstractComponent {
super(settings);
}
public void runAllTasks() {
while (true) {
runAllRunnableTasks();
if (hasDeferredTasks()) {
advanceTime();
} else {
break;
}
}
}
public void runAllRunnableTasks() {
while (hasRunnableTasks()) {
runNextTask();
}
}
public void runAllRunnableTasks(Random random) {
while (hasRunnableTasks()) {
runRandomTask(random);
}
}
/**
* @return whether there are any runnable tasks.
*/
@ -318,16 +340,6 @@ public class DeterministicTaskQueue extends AbstractComponent {
public ScheduledExecutorService scheduler() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
throw new UnsupportedOperationException();
}
@Override
public ThreadContext getThreadContext() {
throw new UnsupportedOperationException();
}
};
}

View File

@ -281,7 +281,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
futureExecutor.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE);
runAllTasks(taskQueue);
taskQueue.runAllTasks();
assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis));
assertThat(strings, contains("runnable", "also runnable", "deferred"));
@ -295,22 +295,10 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
runAllTasks(taskQueue);
taskQueue.runAllTasks();
assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1));
assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred"));
}
private static void runAllTasks(DeterministicTaskQueue taskQueue) {
while (true) {
while (taskQueue.hasRunnableTasks()) {
taskQueue.runNextTask();
}
if (taskQueue.hasDeferredTasks()) {
taskQueue.advanceTime();
} else {
break;
}
}
assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred"));
}
private static DeterministicTaskQueue newTaskQueue() {

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
public class PeerFinderMessagesTests extends ESTestCase {
private DiscoveryNode createNode(String id) {
return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT);
}
public void testPeersRequestEqualsHashCodeSerialization() {
final PeersRequest initialPeersRequest = new PeersRequest(createNode(randomAlphaOfLength(10)),
Arrays.stream(generateRandomStringArray(10, 10, false)).map(this::createNode).collect(Collectors.toList()));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersRequest,
publishRequest -> copyWriteable(publishRequest, writableRegistry(), PeersRequest::new),
in -> {
final List<DiscoveryNode> discoveryNodes = new ArrayList<>(in.getKnownPeers());
if (randomBoolean()) {
return new PeersRequest(createNode(randomAlphaOfLength(10)), discoveryNodes);
} else {
return new PeersRequest(in.getSourceNode(), modifyDiscoveryNodesList(in.getKnownPeers(), true));
}
});
}
public void testPeersResponseEqualsHashCodeSerialization() {
final long initialTerm = randomNonNegativeLong();
final PeersResponse initialPeersResponse;
if (randomBoolean()) {
initialPeersResponse = new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), initialTerm);
} else {
initialPeersResponse = new PeersResponse(Optional.empty(),
Arrays.stream(generateRandomStringArray(10, 10, false, false)).map(this::createNode).collect(Collectors.toList()),
initialTerm);
}
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersResponse,
publishResponse -> copyWriteable(publishResponse, writableRegistry(), PeersResponse::new),
in -> {
final long term = in.getTerm();
if (randomBoolean()) {
return new PeersResponse(in.getMasterNode(), in.getKnownPeers(),
randomValueOtherThan(term, ESTestCase::randomNonNegativeLong));
} else {
if (in.getMasterNode().isPresent()) {
if (randomBoolean()) {
return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), in.getKnownPeers(), term);
} else {
return new PeersResponse(Optional.empty(), singletonList(createNode(randomAlphaOfLength(10))), term);
}
} else {
if (randomBoolean()) {
return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), term);
} else {
return new PeersResponse(in.getMasterNode(), modifyDiscoveryNodesList(in.getKnownPeers(), false), term);
}
}
}
});
}
private List<DiscoveryNode> modifyDiscoveryNodesList(Collection<DiscoveryNode> originalNodes, boolean allowEmpty) {
final List<DiscoveryNode> discoveryNodes = new ArrayList<>(originalNodes);
if (discoveryNodes.isEmpty() == false && randomBoolean() && (allowEmpty || discoveryNodes.size() > 1)) {
discoveryNodes.remove(randomIntBetween(0, discoveryNodes.size() - 1));
} else if (discoveryNodes.isEmpty() == false && randomBoolean()) {
discoveryNodes.set(randomIntBetween(0, discoveryNodes.size() - 1), createNode(randomAlphaOfLength(10)));
} else {
discoveryNodes.add(createNode(randomAlphaOfLength(10)));
}
return discoveryNodes;
}
}

View File

@ -0,0 +1,703 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.coordination.FutureExecutor;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class PeerFinderTests extends ESTestCase {
private CapturingTransport capturingTransport;
private DeterministicTaskQueue deterministicTaskQueue;
private DiscoveryNode localNode;
private MockTransportAddressConnector transportAddressConnector;
private TestPeerFinder peerFinder;
private List<TransportAddress> providedAddresses;
private long addressResolveDelay; // -1 means address resolution fails
private Set<DiscoveryNode> disconnectedNodes = new HashSet<>();
private Set<DiscoveryNode> connectedNodes = new HashSet<>();
private DiscoveryNodes lastAcceptedNodes;
private TransportService transportService;
private static long CONNECTION_TIMEOUT_MILLIS = 30000;
class MockTransportAddressConnector implements TransportAddressConnector {
final Set<DiscoveryNode> reachableNodes = new HashSet<>();
final Set<TransportAddress> unreachableAddresses = new HashSet<>();
final Set<TransportAddress> slowAddresses = new HashSet<>();
final Set<TransportAddress> inFlightConnectionAttempts = new HashSet<>();
@Override
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
assert localNode.getAddress().equals(transportAddress) == false : "should not probe local node";
final boolean isNotInFlight = inFlightConnectionAttempts.add(transportAddress);
assertTrue(isNotInFlight);
final long connectResultTime = deterministicTaskQueue.getCurrentTimeMillis()
+ (slowAddresses.contains(transportAddress) ? CONNECTION_TIMEOUT_MILLIS : 0);
deterministicTaskQueue.scheduleAt(connectResultTime, new Runnable() {
@Override
public void run() {
if (unreachableAddresses.contains(transportAddress)) {
assertTrue(inFlightConnectionAttempts.remove(transportAddress));
listener.onFailure(new IOException("cannot connect to " + transportAddress));
return;
}
for (final DiscoveryNode discoveryNode : reachableNodes) {
if (discoveryNode.getAddress().equals(transportAddress)) {
if (discoveryNode.isMasterNode()) {
disconnectedNodes.remove(discoveryNode);
connectedNodes.add(discoveryNode);
assertTrue(inFlightConnectionAttempts.remove(transportAddress));
listener.onResponse(discoveryNode);
return;
} else {
listener.onFailure(new ElasticsearchException("non-master node " + discoveryNode));
return;
}
}
}
throw new AssertionError(transportAddress + " unknown");
}
@Override
public String toString() {
return "connection attempt to " + transportAddress;
}
});
}
}
class TestPeerFinder extends PeerFinder {
DiscoveryNode discoveredMasterNode;
OptionalLong discoveredMasterTerm = OptionalLong.empty();
TestPeerFinder(Settings settings, TransportService transportService, FutureExecutor futureExecutor,
TransportAddressConnector transportAddressConnector) {
super(settings, transportService, futureExecutor, transportAddressConnector, PeerFinderTests.this::resolveConfiguredHosts);
}
@Override
protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
assert holdsLock() == false : "PeerFinder lock held in error";
assertThat(discoveredMasterNode, nullValue());
assertFalse(discoveredMasterTerm.isPresent());
discoveredMasterNode = masterNode;
discoveredMasterTerm = OptionalLong.of(term);
}
}
private void resolveConfiguredHosts(Consumer<List<TransportAddress>> onResult) {
if (addressResolveDelay >= 0) {
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + addressResolveDelay, new Runnable() {
@Override
public void run() {
onResult.accept(providedAddresses);
}
@Override
public String toString() {
return "PeerFinderTests#resolveConfiguredHosts";
}
});
} else {
assertThat(addressResolveDelay, is(-1L));
}
}
private void updateLastAcceptedNodes(Consumer<DiscoveryNodes.Builder> onBuilder) {
final Builder builder = DiscoveryNodes.builder(lastAcceptedNodes);
onBuilder.accept(builder);
lastAcceptedNodes = builder.build();
}
@Before
public void setup() {
capturingTransport = new CapturingTransport() {
@Override
public boolean nodeConnected(DiscoveryNode node) {
final boolean isConnected = connectedNodes.contains(node);
final boolean isDisconnected = disconnectedNodes.contains(node);
assert isConnected != isDisconnected : node + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected;
return isConnected;
}
};
transportAddressConnector = new MockTransportAddressConnector();
providedAddresses = new ArrayList<>();
addressResolveDelay = 0L;
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
deterministicTaskQueue = new DeterministicTaskQueue(settings);
localNode = newDiscoveryNode("local-node");
transportService = new TransportService(settings, capturingTransport,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
lastAcceptedNodes = DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build();
peerFinder = new TestPeerFinder(settings, transportService,
deterministicTaskQueue.getFutureExecutor(), transportAddressConnector);
}
@After
public void deactivateAndRunRemainingTasks() {
peerFinder.deactivate(localNode);
deterministicTaskQueue.runAllTasks(); // termination ensures that everything is properly cleaned up
peerFinder.assertInactiveWithNoKnownPeers(); // should eventually have no nodes when deactivated
}
public void testAddsReachableNodesFromUnicastHostsList() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
}
public void testAddsReachableNodesFromUnicastHostsListProvidedLater() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
addressResolveDelay = 10000;
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
final long successTime = addressResolveDelay + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis();
while (deterministicTaskQueue.getCurrentTimeMillis() < successTime) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
assertFoundPeers(otherNode);
}
public void testDoesNotRequireAddressResolutionToSucceed() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
addressResolveDelay = -1;
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
final long successTime = 10000 + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis();
while (deterministicTaskQueue.getCurrentTimeMillis() < successTime) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
assertFoundPeers();
}
public void testDoesNotAddUnreachableNodesFromUnicastHostsList() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.unreachableAddresses.add(otherNode.getAddress());
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
}
public void testDoesNotAddNonMasterEligibleNodesFromUnicastHostsList() {
final DiscoveryNode nonMasterNode = new DiscoveryNode("node-from-hosts-list", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), Version.CURRENT);
providedAddresses.add(nonMasterNode.getAddress());
transportAddressConnector.reachableNodes.add(nonMasterNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
assertThat(capturingTransport.capturedRequests(), emptyArray());
}
public void testChecksUnicastHostsForChanges() {
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
assertFoundPeers(otherNode);
}
public void testDeactivationClearsPastKnowledge() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
peerFinder.deactivate(localNode);
providedAddresses.clear();
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
}
public void testAddsReachableNodesFromClusterState() {
final DiscoveryNode otherNode = newDiscoveryNode("node-in-cluster-state");
updateLastAcceptedNodes(b -> b.add(otherNode));
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
}
public void testDoesNotAddUnreachableNodesFromClusterState() {
final DiscoveryNode otherNode = newDiscoveryNode("node-in-cluster-state");
updateLastAcceptedNodes(b -> b.add(otherNode));
transportAddressConnector.unreachableAddresses.add(otherNode.getAddress());
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
}
public void testAddsReachableNodesFromIncomingRequests() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.reachableNodes.add(otherKnownNode);
peerFinder.activate(lastAcceptedNodes);
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode)));
runAllRunnableTasks();
assertFoundPeers(sourceNode, otherKnownNode);
}
public void testDoesNotAddUnreachableNodesFromIncomingRequests() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node");
transportAddressConnector.reachableNodes.add(sourceNode);
transportAddressConnector.unreachableAddresses.add(otherKnownNode.getAddress());
peerFinder.activate(lastAcceptedNodes);
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode)));
runAllRunnableTasks();
assertFoundPeers(sourceNode);
}
public void testDoesNotAddUnreachableSourceNodeFromIncomingRequests() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
final DiscoveryNode otherKnownNode = newDiscoveryNode("other-known-node");
transportAddressConnector.unreachableAddresses.add(sourceNode.getAddress());
transportAddressConnector.reachableNodes.add(otherKnownNode);
peerFinder.activate(lastAcceptedNodes);
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.singletonList(otherKnownNode)));
runAllRunnableTasks();
assertFoundPeers(otherKnownNode);
}
public void testRespondsToRequestWhenActive() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
peerFinder.activate(lastAcceptedNodes);
final PeersResponse peersResponse1 = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList()));
assertFalse(peersResponse1.getMasterNode().isPresent());
assertThat(peersResponse1.getKnownPeers(), empty()); // sourceNode is not yet known
assertThat(peersResponse1.getTerm(), is(0L));
runAllRunnableTasks();
assertFoundPeers(sourceNode);
final long updatedTerm = randomNonNegativeLong();
peerFinder.setCurrentTerm(updatedTerm);
final PeersResponse peersResponse2 = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList()));
assertFalse(peersResponse2.getMasterNode().isPresent());
assertThat(peersResponse2.getKnownPeers(), contains(sourceNode));
assertThat(peersResponse2.getTerm(), is(updatedTerm));
}
public void testDelegatesRequestHandlingWhenInactive() {
final DiscoveryNode masterNode = newDiscoveryNode("master-node");
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
peerFinder.activate(DiscoveryNodes.EMPTY_NODES);
final long term = randomNonNegativeLong();
peerFinder.setCurrentTerm(term);
peerFinder.deactivate(masterNode);
final PeersResponse expectedResponse = new PeersResponse(Optional.of(masterNode), Collections.emptyList(), term);
final PeersResponse peersResponse = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList()));
assertThat(peersResponse, equalTo(expectedResponse));
}
public void testReceivesRequestsFromTransportService() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
peerFinder.activate(lastAcceptedNodes);
final AtomicBoolean responseReceived = new AtomicBoolean();
transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(sourceNode, Collections.emptyList()),
new TransportResponseHandler<PeersResponse>() {
@Override
public void handleResponse(PeersResponse response) {
assertTrue(responseReceived.compareAndSet(false, true));
assertFalse(response.getMasterNode().isPresent());
assertThat(response.getKnownPeers(), empty()); // sourceNode is not yet known
assertThat(response.getTerm(), is(0L));
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("unexpected", exp);
}
@Override
public String executor() {
return Names.SAME;
}
});
runAllRunnableTasks();
assertTrue(responseReceived.get());
assertFoundPeers(sourceNode);
}
public void testRequestsPeersIncludingKnownPeersInRequest() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, is(1));
final PeersRequest peersRequest = (PeersRequest) capturedRequests[0].request;
assertThat(peersRequest.getKnownPeers(), contains(otherNode));
}
public void testAddsReachablePeersFromResponse() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
final DiscoveryNode discoveredNode = newDiscoveryNode("discovered-node");
transportAddressConnector.reachableNodes.add(discoveredNode);
respondToRequests(node -> {
assertThat(node, is(otherNode));
return new PeersResponse(Optional.empty(), singletonList(discoveredNode), randomNonNegativeLong());
});
runAllRunnableTasks();
assertFoundPeers(otherNode, discoveredNode);
}
public void testAddsReachableMasterFromResponse() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
final DiscoveryNode discoveredMaster = newDiscoveryNode("discovered-master");
respondToRequests(node -> {
assertThat(node, is(otherNode));
return new PeersResponse(Optional.of(discoveredMaster), emptyList(), randomNonNegativeLong());
});
transportAddressConnector.reachableNodes.add(discoveredMaster);
runAllRunnableTasks();
assertFoundPeers(otherNode, discoveredMaster);
assertThat(peerFinder.discoveredMasterNode, nullValue());
assertFalse(peerFinder.discoveredMasterTerm.isPresent());
}
public void testHandlesDiscoveryOfMasterFromResponseFromMaster() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
final long term = randomNonNegativeLong();
respondToRequests(node -> {
assertThat(node, is(otherNode));
return new PeersResponse(Optional.of(otherNode), emptyList(), term);
});
runAllRunnableTasks();
assertFoundPeers(otherNode);
assertThat(peerFinder.discoveredMasterNode, is(otherNode));
assertThat(peerFinder.discoveredMasterTerm, is(OptionalLong.of(term)));
}
public void testOnlyRequestsPeersOncePerRoundButDoesRetryNextRound() {
final DiscoveryNode sourceNode = newDiscoveryNode("request-source");
transportAddressConnector.reachableNodes.add(sourceNode);
peerFinder.activate(lastAcceptedNodes);
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, emptyList()));
runAllRunnableTasks();
assertFoundPeers(sourceNode);
respondToRequests(node -> {
assertThat(node, is(sourceNode));
return new PeersResponse(Optional.empty(), singletonList(sourceNode), randomNonNegativeLong());
});
peerFinder.handlePeersRequest(new PeersRequest(sourceNode, emptyList()));
runAllRunnableTasks();
respondToRequests(node -> {
throw new AssertionError("there should have been no further requests");
});
final DiscoveryNode otherNode = newDiscoveryNode("otherNode");
transportAddressConnector.reachableNodes.add(otherNode);
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
respondToRequests(node -> {
assertThat(node, is(sourceNode));
return new PeersResponse(Optional.empty(), singletonList(otherNode), randomNonNegativeLong());
});
runAllRunnableTasks();
assertFoundPeers(sourceNode, otherNode);
}
public void testDoesNotReconnectToNodesOnceConnected() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
transportAddressConnector.reachableNodes.clear();
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
assertFoundPeers(otherNode);
}
public void testDiscardsDisconnectedNodes() {
final DiscoveryNode otherNode = newDiscoveryNode("original-node");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
transportAddressConnector.reachableNodes.clear();
transportAddressConnector.unreachableAddresses.add(otherNode.getAddress());
connectedNodes.remove(otherNode);
disconnectedNodes.add(otherNode);
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
assertFoundPeers();
}
public void testDoesNotMakeMultipleConcurrentConnectionAttemptsToOneAddress() {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.unreachableAddresses.add(otherNode.getAddress());
transportAddressConnector.slowAddresses.add(otherNode.getAddress());
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers();
deterministicTaskQueue.advanceTime();
runAllRunnableTasks(); // MockTransportAddressConnector verifies no multiple connection attempts
assertFoundPeers();
transportAddressConnector.slowAddresses.clear();
transportAddressConnector.unreachableAddresses.clear();
transportAddressConnector.reachableNodes.add(otherNode);
while (deterministicTaskQueue.getCurrentTimeMillis() < CONNECTION_TIMEOUT_MILLIS) {
assertFoundPeers();
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
// need to wait for the connection to timeout, then for another wakeup, before discovering the peer
final long expectedTime = CONNECTION_TIMEOUT_MILLIS + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis();
while (deterministicTaskQueue.getCurrentTimeMillis() < expectedTime) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
assertFoundPeers(otherNode);
}
public void testReconnectsToDisconnectedNodes() {
final DiscoveryNode otherNode = newDiscoveryNode("original-node");
providedAddresses.add(otherNode.getAddress());
transportAddressConnector.reachableNodes.add(otherNode);
peerFinder.activate(lastAcceptedNodes);
runAllRunnableTasks();
assertFoundPeers(otherNode);
transportAddressConnector.reachableNodes.clear();
final DiscoveryNode rebootedOtherNode = new DiscoveryNode("rebooted-node", otherNode.getAddress(), Version.CURRENT);
transportAddressConnector.reachableNodes.add(rebootedOtherNode);
connectedNodes.remove(otherNode);
disconnectedNodes.add(otherNode);
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
assertFoundPeers(rebootedOtherNode);
}
private void respondToRequests(Function<DiscoveryNode, PeersResponse> responseFactory) {
final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear();
for (final CapturedRequest capturedRequest : capturedRequests) {
assertThat(capturedRequest.action, is(REQUEST_PEERS_ACTION_NAME));
assertThat(capturedRequest.request, instanceOf(PeersRequest.class));
final PeersRequest peersRequest = (PeersRequest) capturedRequest.request;
assertThat(peersRequest.getSourceNode(), is(localNode));
capturingTransport.handleResponse(capturedRequests[0].requestId, responseFactory.apply(capturedRequest.node));
}
}
private void assertFoundPeers(DiscoveryNode... expectedNodesArray) {
final Stream<DiscoveryNode> expectedNodes = Arrays.stream(expectedNodesArray);
final Stream<DiscoveryNode> actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false);
assertThat(actualNodes.collect(Collectors.toSet()), equalTo(expectedNodes.collect(Collectors.toSet())));
}
private DiscoveryNode newDiscoveryNode(String nodeId) {
return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), Version.CURRENT);
}
private void runAllRunnableTasks() {
deterministicTaskQueue.runAllRunnableTasks(random());
}
}