Add current cluster state version to zen pings and use them in master election (#20384)
During a networking partition, cluster states updates (like mapping changes or shard assignments) are committed if a majority of the masters node received the update correctly. This means that the current master has access to enough nodes in the cluster to continue to operate correctly. When the network partition heals, the isolated nodes catch up with the current state and get the changes they couldn't receive before. However, if a second partition happens while the cluster is still recovering from the previous one *and* the old master is put in the minority side, it may be that a new master is elected which did not yet catch up. If that happens, cluster state updates can be lost. This commit fixed 95% of this rare problem by adding the current cluster state version to `PingResponse` and use them when deciding which master to join (and thus casting the node's vote). Note: this doesn't fully mitigate the problem as a cluster state update which is issued concurrently with a network partition can be lost if the partition prevents the commit message (part of the two phased commit of cluster state updates) from reaching any single node in the majority side *and* the partition does allow for the master to acknowledge the change. We are working on a more comprehensive fix but that requires considerate work and is targeted at 6.0.
This commit is contained in:
parent
0556c93920
commit
577dcb3237
|
@ -54,8 +54,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||
import org.elasticsearch.env.Environment;
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -27,8 +26,8 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.ExtensionPoint;
|
||||
import org.elasticsearch.discovery.local.LocalDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
||||
|
|
|
@ -17,11 +17,10 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery.zen.elect;
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectContainer;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -33,9 +32,11 @@ import org.elasticsearch.common.util.CollectionUtils;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -45,17 +46,64 @@ public class ElectMasterService extends AbstractComponent {
|
|||
public static final Setting<Integer> DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING =
|
||||
Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
// This is the minimum version a master needs to be on, otherwise it gets ignored
|
||||
// This is based on the minimum compatible version of the current version this node is on
|
||||
private final Version minMasterVersion;
|
||||
private final NodeComparator nodeComparator = new NodeComparator();
|
||||
|
||||
private volatile int minimumMasterNodes;
|
||||
|
||||
/**
|
||||
* a class to encapsulate all the information about a candidate in a master election
|
||||
* that is needed to decided which of the candidates should win
|
||||
*/
|
||||
public static class MasterCandidate {
|
||||
|
||||
public static final long UNRECOVERED_CLUSTER_VERSION = -1;
|
||||
|
||||
final DiscoveryNode node;
|
||||
|
||||
final long clusterStateVersion;
|
||||
|
||||
public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
|
||||
Objects.requireNonNull(node);
|
||||
assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
|
||||
assert node.isMasterNode();
|
||||
this.node = node;
|
||||
this.clusterStateVersion = clusterStateVersion;
|
||||
}
|
||||
|
||||
public DiscoveryNode getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public long getClusterStateVersion() {
|
||||
return clusterStateVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Candidate{" +
|
||||
"node=" + node +
|
||||
", clusterStateVersion=" + clusterStateVersion +
|
||||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* compares two candidates to indicate which the a better master.
|
||||
* A higher cluster state version is better
|
||||
*
|
||||
* @return -1 if c1 is a batter candidate, 1 if c2.
|
||||
*/
|
||||
public static int compare(MasterCandidate c1, MasterCandidate c2) {
|
||||
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
|
||||
// list, so if c2 has a higher cluster state version, it needs to come first.
|
||||
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
|
||||
if (ret == 0) {
|
||||
ret = compareNodes(c1.getNode(), c2.getNode());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
public ElectMasterService(Settings settings) {
|
||||
super(settings);
|
||||
this.minMasterVersion = Version.CURRENT.minimumCompatibilityVersion();
|
||||
this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
|
||||
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
|
||||
}
|
||||
|
@ -69,16 +117,41 @@ public class ElectMasterService extends AbstractComponent {
|
|||
}
|
||||
|
||||
public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
|
||||
if (minimumMasterNodes < 1) {
|
||||
return true;
|
||||
}
|
||||
int count = 0;
|
||||
for (DiscoveryNode node : nodes) {
|
||||
if (node.isMasterNode()) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
return count >= minimumMasterNodes;
|
||||
return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes);
|
||||
}
|
||||
|
||||
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
|
||||
if (candidates.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (minimumMasterNodes < 1) {
|
||||
return true;
|
||||
}
|
||||
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
|
||||
"duplicates ahead: " + candidates;
|
||||
return candidates.size() >= minimumMasterNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
|
||||
* if no master has been elected.
|
||||
*/
|
||||
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
|
||||
assert hasEnoughCandidates(candidates);
|
||||
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
|
||||
sortedCandidates.sort(MasterCandidate::compare);
|
||||
return sortedCandidates.get(0);
|
||||
}
|
||||
|
||||
/** selects the best active master to join, where multiple are discovered */
|
||||
public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
|
||||
return activeMasters.stream().min(ElectMasterService::compareNodes).get();
|
||||
}
|
||||
|
||||
public boolean hasTooManyMasterNodes(Iterable<DiscoveryNode> nodes) {
|
||||
|
@ -107,7 +180,7 @@ public class ElectMasterService extends AbstractComponent {
|
|||
*/
|
||||
public List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
|
||||
ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes);
|
||||
CollectionUtil.introSort(sortedNodes, nodeComparator);
|
||||
CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes);
|
||||
return sortedNodes;
|
||||
}
|
||||
|
||||
|
@ -130,25 +203,6 @@ public class ElectMasterService extends AbstractComponent {
|
|||
return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
|
||||
* if no master has been elected.
|
||||
*/
|
||||
public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
|
||||
List<DiscoveryNode> sortedNodes = sortedMasterNodes(nodes);
|
||||
if (sortedNodes == null || sortedNodes.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
DiscoveryNode masterNode = sortedNodes.get(0);
|
||||
// Sanity check: maybe we don't end up here, because serialization may have failed.
|
||||
if (masterNode.getVersion().before(minMasterVersion)) {
|
||||
logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion);
|
||||
return null;
|
||||
} else {
|
||||
return masterNode;
|
||||
}
|
||||
}
|
||||
|
||||
private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
|
||||
List<DiscoveryNode> possibleNodes = CollectionUtils.iterableAsArrayList(nodes);
|
||||
if (possibleNodes.isEmpty()) {
|
||||
|
@ -161,14 +215,12 @@ public class ElectMasterService extends AbstractComponent {
|
|||
it.remove();
|
||||
}
|
||||
}
|
||||
CollectionUtil.introSort(possibleNodes, nodeComparator);
|
||||
CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes);
|
||||
return possibleNodes;
|
||||
}
|
||||
|
||||
private static class NodeComparator implements Comparator<DiscoveryNode> {
|
||||
|
||||
@Override
|
||||
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
|
||||
/** master nodes go before other nodes, with a secondary sort by id **/
|
||||
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
|
||||
if (o1.isMasterNode() && !o2.isMasterNode()) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -178,4 +230,3 @@ public class ElectMasterService extends AbstractComponent {
|
|||
return o1.getId().compareTo(o2.getId());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -41,7 +41,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -56,7 +55,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.DiscoveryStats;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
|
||||
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
|
||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
||||
|
@ -76,13 +74,10 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -146,9 +141,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
private final JoinThreadControl joinThreadControl;
|
||||
|
||||
/** counts the time this node has joined the cluster or have elected it self as master */
|
||||
private final AtomicLong clusterJoinsCounter = new AtomicLong();
|
||||
|
||||
// must initialized in doStart(), when we have the allocationService set
|
||||
private volatile NodeJoinController nodeJoinController;
|
||||
private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
||||
|
@ -304,8 +296,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return clusterJoinsCounter.get() > 0;
|
||||
public ClusterState clusterState() {
|
||||
return clusterService.state();
|
||||
}
|
||||
|
||||
/** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
|
||||
|
@ -406,8 +398,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
joinThreadControl.markThreadAsDone(currentThread);
|
||||
// we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
|
||||
nodesFD.updateNodesAndPing(state); // start the nodes FD
|
||||
long count = clusterJoinsCounter.incrementAndGet();
|
||||
logger.trace("cluster joins counter set to [{}] (elected as master)", count);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -764,9 +754,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
|
||||
// its a fresh update from the master as we transition from a start of not having a master to having one
|
||||
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
|
||||
long count = clusterJoinsCounter.incrementAndGet();
|
||||
logger.trace("updated cluster join cluster to [{}]", count);
|
||||
|
||||
return newClusterState;
|
||||
}
|
||||
|
||||
|
@ -873,16 +860,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
} else if (nodeJoinController == null) {
|
||||
throw new IllegalStateException("discovery module is not yet started");
|
||||
} else {
|
||||
// The minimum supported version for a node joining a master:
|
||||
Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion();
|
||||
// Sanity check: maybe we don't end up here, because serialization may have failed.
|
||||
if (node.getVersion().before(minimumNodeJoinVersion)) {
|
||||
callback.onFailure(
|
||||
new IllegalStateException("Can't handle join request from a node with a version [" + node.getVersion() + "] that is lower than the minimum compatible version [" + minimumNodeJoinVersion.minimumCompatibilityVersion() + "]")
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// try and connect to the node, if it fails, we can raise an exception back to the client...
|
||||
transportService.connectToNode(node);
|
||||
|
||||
|
@ -901,14 +878,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
private DiscoveryNode findMaster() {
|
||||
logger.trace("starting to ping");
|
||||
ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);
|
||||
List<ZenPing.PingResponse> fullPingResponses = pingService.pingAndWait(pingTimeout).toList();
|
||||
if (fullPingResponses == null) {
|
||||
logger.trace("No full ping responses");
|
||||
return null;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (fullPingResponses.length == 0) {
|
||||
if (fullPingResponses.size() == 0) {
|
||||
sb.append(" {none}");
|
||||
} else {
|
||||
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
|
||||
|
@ -918,69 +895,57 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
logger.trace("full ping responses:{}", sb);
|
||||
}
|
||||
|
||||
final DiscoveryNode localNode = clusterService.localNode();
|
||||
|
||||
// add our selves
|
||||
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
|
||||
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
|
||||
|
||||
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));
|
||||
|
||||
// filter responses
|
||||
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
|
||||
|
||||
final DiscoveryNode localNode = clusterService.localNode();
|
||||
List<DiscoveryNode> pingMasters = new ArrayList<>();
|
||||
List<DiscoveryNode> activeMasters = new ArrayList<>();
|
||||
for (ZenPing.PingResponse pingResponse : pingResponses) {
|
||||
if (pingResponse.master() != null) {
|
||||
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
|
||||
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
|
||||
if (!localNode.equals(pingResponse.master())) {
|
||||
pingMasters.add(pingResponse.master());
|
||||
}
|
||||
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
|
||||
activeMasters.add(pingResponse.master());
|
||||
}
|
||||
}
|
||||
|
||||
// nodes discovered during pinging
|
||||
Set<DiscoveryNode> activeNodes = new HashSet<>();
|
||||
// nodes discovered who has previously been part of the cluster and do not ping for the very first time
|
||||
Set<DiscoveryNode> joinedOnceActiveNodes = new HashSet<>();
|
||||
if (localNode.isMasterNode()) {
|
||||
activeNodes.add(localNode);
|
||||
long joinsCounter = clusterJoinsCounter.get();
|
||||
if (joinsCounter > 0) {
|
||||
logger.trace("adding local node to the list of active nodes that have previously joined the cluster (joins counter is [{}])", joinsCounter);
|
||||
joinedOnceActiveNodes.add(localNode);
|
||||
}
|
||||
}
|
||||
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
|
||||
for (ZenPing.PingResponse pingResponse : pingResponses) {
|
||||
activeNodes.add(pingResponse.node());
|
||||
if (pingResponse.hasJoinedOnce()) {
|
||||
joinedOnceActiveNodes.add(pingResponse.node());
|
||||
if (pingResponse.node().isMasterNode()) {
|
||||
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
|
||||
}
|
||||
}
|
||||
|
||||
if (pingMasters.isEmpty()) {
|
||||
if (electMaster.hasEnoughMasterNodes(activeNodes)) {
|
||||
// we give preference to nodes who have previously already joined the cluster. Those will
|
||||
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
|
||||
// by the gateway)
|
||||
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);
|
||||
if (master != null) {
|
||||
return master;
|
||||
}
|
||||
return electMaster.electMaster(activeNodes);
|
||||
if (activeMasters.isEmpty()) {
|
||||
if (electMaster.hasEnoughCandidates(masterCandidates)) {
|
||||
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
|
||||
logger.trace("candidate {} won election", winner);
|
||||
return winner.getNode();
|
||||
} else {
|
||||
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
|
||||
logger.trace("not enough master nodes [{}]", activeNodes);
|
||||
logger.trace("not enough master nodes [{}]", masterCandidates);
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
|
||||
assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
|
||||
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
|
||||
// lets tie break between discovered nodes
|
||||
return electMaster.electMaster(pingMasters);
|
||||
return electMaster.tieBreakActiveMasters(activeMasters);
|
||||
}
|
||||
}
|
||||
|
||||
static List<ZenPing.PingResponse> filterPingResponses(ZenPing.PingResponse[] fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) {
|
||||
static List<ZenPing.PingResponse> filterPingResponses(List<ZenPing.PingResponse> fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) {
|
||||
List<ZenPing.PingResponse> pingResponses;
|
||||
if (masterElectionIgnoreNonMasters) {
|
||||
pingResponses = Arrays.stream(fullPingResponses).filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList());
|
||||
pingResponses = fullPingResponses.stream().filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList());
|
||||
} else {
|
||||
pingResponses = Arrays.asList(fullPingResponses);
|
||||
pingResponses = fullPingResponses;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen.ping;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
||||
|
||||
/**
|
||||
|
@ -26,7 +27,7 @@ import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
|||
*/
|
||||
public interface PingContextProvider extends DiscoveryNodesProvider {
|
||||
|
||||
/** return true if this node has previously joined the cluster at least once. False if this is first join */
|
||||
boolean nodeHasJoinedClusterOnce();
|
||||
/** return the current cluster state of the node */
|
||||
ClusterState clusterState();
|
||||
|
||||
}
|
||||
|
|
|
@ -20,30 +20,42 @@
|
|||
package org.elasticsearch.discovery.zen.ping;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
public interface ZenPing extends LifecycleComponent {
|
||||
|
||||
void setPingContextProvider(PingContextProvider contextProvider);
|
||||
|
||||
void ping(PingListener listener, TimeValue timeout);
|
||||
|
||||
public interface PingListener {
|
||||
interface PingListener {
|
||||
|
||||
void onPing(PingResponse[] pings);
|
||||
/**
|
||||
* called when pinging is done.
|
||||
*
|
||||
* @param pings ping result *must
|
||||
*/
|
||||
void onPing(Collection<PingResponse> pings);
|
||||
}
|
||||
|
||||
public static class PingResponse implements Streamable {
|
||||
class PingResponse implements Streamable {
|
||||
|
||||
public static final PingResponse[] EMPTY = new PingResponse[0];
|
||||
|
||||
|
@ -59,7 +71,7 @@ public interface ZenPing extends LifecycleComponent {
|
|||
|
||||
private DiscoveryNode master;
|
||||
|
||||
private boolean hasJoinedOnce;
|
||||
private long clusterStateVersion;
|
||||
|
||||
private PingResponse() {
|
||||
}
|
||||
|
@ -68,14 +80,21 @@ public interface ZenPing extends LifecycleComponent {
|
|||
* @param node the node which this ping describes
|
||||
* @param master the current master of the node
|
||||
* @param clusterName the cluster name of the node
|
||||
* @param hasJoinedOnce true if the joined has successfully joined the cluster before
|
||||
* @param clusterStateVersion the current cluster state version of that node
|
||||
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
|
||||
*/
|
||||
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, boolean hasJoinedOnce) {
|
||||
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
|
||||
this.id = idGenerator.incrementAndGet();
|
||||
this.node = node;
|
||||
this.master = master;
|
||||
this.clusterName = clusterName;
|
||||
this.hasJoinedOnce = hasJoinedOnce;
|
||||
this.clusterStateVersion = clusterStateVersion;
|
||||
}
|
||||
|
||||
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state) {
|
||||
this(node, master, state.getClusterName(),
|
||||
state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ?
|
||||
ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -100,9 +119,11 @@ public interface ZenPing extends LifecycleComponent {
|
|||
return master;
|
||||
}
|
||||
|
||||
/** true if the joined has successfully joined the cluster before */
|
||||
public boolean hasJoinedOnce() {
|
||||
return hasJoinedOnce;
|
||||
/**
|
||||
* the current cluster state version of that node ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION}
|
||||
* for not recovered) */
|
||||
public long getClusterStateVersion() {
|
||||
return clusterStateVersion;
|
||||
}
|
||||
|
||||
public static PingResponse readPingResponse(StreamInput in) throws IOException {
|
||||
|
@ -118,7 +139,7 @@ public interface ZenPing extends LifecycleComponent {
|
|||
if (in.readBoolean()) {
|
||||
master = new DiscoveryNode(in);
|
||||
}
|
||||
this.hasJoinedOnce = in.readBoolean();
|
||||
this.clusterStateVersion = in.readLong();
|
||||
this.id = in.readLong();
|
||||
}
|
||||
|
||||
|
@ -132,13 +153,14 @@ public interface ZenPing extends LifecycleComponent {
|
|||
out.writeBoolean(true);
|
||||
master.writeTo(out);
|
||||
}
|
||||
out.writeBoolean(hasJoinedOnce);
|
||||
out.writeLong(clusterStateVersion);
|
||||
out.writeLong(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}";
|
||||
return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], cluster_state_version [" + clusterStateVersion
|
||||
+ "], cluster_name[" + clusterName.value() + "]}";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,7 +168,7 @@ public interface ZenPing extends LifecycleComponent {
|
|||
/**
|
||||
* a utility collection of pings where only the most recent ping is stored per node
|
||||
*/
|
||||
public static class PingCollection {
|
||||
class PingCollection {
|
||||
|
||||
Map<DiscoveryNode, PingResponse> pings;
|
||||
|
||||
|
@ -171,15 +193,15 @@ public interface ZenPing extends LifecycleComponent {
|
|||
}
|
||||
|
||||
/** adds multiple pings if newer than previous pings from the same node */
|
||||
public synchronized void addPings(PingResponse[] pings) {
|
||||
public synchronized void addPings(Iterable<PingResponse> pings) {
|
||||
for (PingResponse ping : pings) {
|
||||
addPing(ping);
|
||||
}
|
||||
}
|
||||
|
||||
/** serialize current pings to an array */
|
||||
public synchronized PingResponse[] toArray() {
|
||||
return pings.values().toArray(new PingResponse[pings.size()]);
|
||||
/** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */
|
||||
public synchronized List<PingResponse> toList() {
|
||||
return new ArrayList<>(pings.values());
|
||||
}
|
||||
|
||||
/** the number of nodes for which there are known pings */
|
||||
|
|
|
@ -23,17 +23,15 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class ZenPingService extends AbstractLifecycleComponent implements ZenPing {
|
||||
public class ZenPingService extends AbstractLifecycleComponent {
|
||||
|
||||
private List<ZenPing> zenPings = Collections.emptyList();
|
||||
|
||||
|
@ -47,7 +45,6 @@ public class ZenPingService extends AbstractLifecycleComponent implements ZenPin
|
|||
return this.zenPings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPingContextProvider(PingContextProvider contextProvider) {
|
||||
if (lifecycle.started()) {
|
||||
throw new IllegalStateException("Can't set nodes provider when started");
|
||||
|
@ -78,60 +75,31 @@ public class ZenPingService extends AbstractLifecycleComponent implements ZenPin
|
|||
}
|
||||
}
|
||||
|
||||
public PingResponse[] pingAndWait(TimeValue timeout) {
|
||||
final AtomicReference<PingResponse[]> response = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ping(new PingListener() {
|
||||
@Override
|
||||
public void onPing(PingResponse[] pings) {
|
||||
response.set(pings);
|
||||
public ZenPing.PingCollection pingAndWait(TimeValue timeout) {
|
||||
final ZenPing.PingCollection response = new ZenPing.PingCollection();
|
||||
final CountDownLatch latch = new CountDownLatch(zenPings.size());
|
||||
for (ZenPing zenPing : zenPings) {
|
||||
final AtomicBoolean counted = new AtomicBoolean();
|
||||
try {
|
||||
zenPing.ping(pings -> {
|
||||
response.addPings(pings);
|
||||
if (counted.compareAndSet(false, true)) {
|
||||
latch.countDown();
|
||||
}
|
||||
}, timeout);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("Ping execution failed", ex);
|
||||
if (counted.compareAndSet(false, true)) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
latch.await();
|
||||
return response.get();
|
||||
return response;
|
||||
} catch (InterruptedException e) {
|
||||
logger.trace("pingAndWait interrupted");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ping(PingListener listener, TimeValue timeout) {
|
||||
List<? extends ZenPing> zenPings = this.zenPings;
|
||||
CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings);
|
||||
for (ZenPing zenPing : zenPings) {
|
||||
try {
|
||||
zenPing.ping(compoundPingListener, timeout);
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Ping execution rejected", ex);
|
||||
compoundPingListener.onPing(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class CompoundPingListener implements PingListener {
|
||||
|
||||
private final PingListener listener;
|
||||
|
||||
private final AtomicInteger counter;
|
||||
|
||||
private PingCollection responses = new PingCollection();
|
||||
|
||||
private CompoundPingListener(PingListener listener, List<? extends ZenPing> zenPings) {
|
||||
this.listener = listener;
|
||||
this.counter = new AtomicInteger(zenPings.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPing(PingResponse[] pings) {
|
||||
if (pings != null) {
|
||||
responses.addPings(pings);
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onPing(responses.toArray());
|
||||
}
|
||||
return response;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -63,6 +63,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -236,8 +237,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
|
|||
temporalResponses.clear();
|
||||
}
|
||||
|
||||
public PingResponse[] pingAndWait(TimeValue duration) {
|
||||
final AtomicReference<PingResponse[]> response = new AtomicReference<>();
|
||||
// test only
|
||||
Collection<PingResponse> pingAndWait(TimeValue duration) {
|
||||
final AtomicReference<Collection<PingResponse>> response = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ping(pings -> {
|
||||
response.set(pings);
|
||||
|
@ -273,7 +275,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
|
|||
protected void doRun() throws Exception {
|
||||
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler);
|
||||
sendPingsHandler.close();
|
||||
listener.onPing(sendPingsHandler.pingCollection().toArray());
|
||||
listener.onPing(sendPingsHandler.pingCollection().toList());
|
||||
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
|
||||
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
|
||||
transportService.disconnectFromNode(node);
|
||||
|
@ -576,8 +578,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
|
|||
}
|
||||
|
||||
private PingResponse createPingResponse(DiscoveryNodes discoNodes) {
|
||||
return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterName,
|
||||
contextProvider.nodeHasJoinedClusterOnce());
|
||||
return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), contextProvider.clusterState());
|
||||
}
|
||||
|
||||
static class UnicastPingResponse extends TransportResponse {
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.action.DocWriteResponse;
|
|||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
|
|
@ -29,8 +29,8 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
|
|
@ -30,8 +30,8 @@ import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationD
|
|||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
||||
|
|
|
@ -18,13 +18,10 @@
|
|||
*/
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.inject.ModuleTestCase;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.local.LocalDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.NoopDiscovery;
|
||||
|
||||
/**
|
||||
|
|
|
@ -49,8 +49,8 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
|
@ -110,9 +110,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
@ -164,7 +167,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
|
||||
private List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
|
||||
ExecutionException, InterruptedException {
|
||||
configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
|
||||
ensureStableCluster(numberOfNodes);
|
||||
|
||||
|
@ -196,15 +199,15 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
return Arrays.asList(MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
private void configureUnicastCluster(
|
||||
private void configureCluster(
|
||||
int numberOfNodes,
|
||||
@Nullable int[] unicastHostsOrdinals,
|
||||
int minimumMasterNode
|
||||
) throws ExecutionException, InterruptedException {
|
||||
configureUnicastCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
configureCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
}
|
||||
|
||||
private void configureUnicastCluster(
|
||||
private void configureCluster(
|
||||
Settings settings,
|
||||
int numberOfNodes,
|
||||
@Nullable int[] unicastHostsOrdinals,
|
||||
|
@ -1031,7 +1034,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testClusterFormingWithASlowNode() throws Exception {
|
||||
configureUnicastCluster(3, null, 2);
|
||||
configureCluster(3, null, 2);
|
||||
|
||||
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);
|
||||
|
||||
|
@ -1094,7 +1097,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
*/
|
||||
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
|
||||
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
|
||||
configureUnicastCluster(Settings.EMPTY, 3, null, 1);
|
||||
configureCluster(Settings.EMPTY, 3, null, 1);
|
||||
InternalTestCluster.Async<String> masterNodeFuture = internalCluster().startMasterOnlyNodeAsync();
|
||||
InternalTestCluster.Async<String> node_1Future = internalCluster().startDataOnlyNodeAsync();
|
||||
|
||||
|
@ -1135,7 +1138,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
|
||||
public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
|
||||
// test for https://github.com/elastic/elasticsearch/issues/8823
|
||||
configureUnicastCluster(2, null, 1);
|
||||
configureCluster(2, null, 1);
|
||||
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
|
||||
internalCluster().startDataOnlyNode(Settings.EMPTY);
|
||||
|
||||
|
@ -1166,7 +1169,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
|
||||
.build();
|
||||
final String idxName = "test";
|
||||
configureUnicastCluster(settings, 3, null, 2);
|
||||
configureCluster(settings, 3, null, 2);
|
||||
InternalTestCluster.Async<List<String>> masterNodes = internalCluster().startMasterOnlyNodesAsync(2);
|
||||
InternalTestCluster.Async<String> dataNode = internalCluster().startDataOnlyNodeAsync();
|
||||
dataNode.get();
|
||||
|
@ -1195,6 +1198,61 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
assertFalse(client().admin().indices().prepareExists(idxName).get().isExists());
|
||||
}
|
||||
|
||||
public void testElectMasterWithLatestVersion() throws Exception {
|
||||
configureCluster(3, null, 2);
|
||||
final Set<String> nodes = new HashSet<>(internalCluster().startNodesAsync(3).get());
|
||||
ensureStableCluster(3);
|
||||
ServiceDisruptionScheme isolateAllNodes = new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect());
|
||||
internalCluster().setDisruptionScheme(isolateAllNodes);
|
||||
|
||||
logger.info("--> forcing a complete election to make sure \"preferred\" master is elected");
|
||||
isolateAllNodes.startDisrupting();
|
||||
for (String node: nodes) {
|
||||
assertNoMaster(node);
|
||||
}
|
||||
isolateAllNodes.stopDisrupting();
|
||||
ensureStableCluster(3);
|
||||
final String preferredMasterName = internalCluster().getMasterName();
|
||||
final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode();
|
||||
for (String node: nodes) {
|
||||
DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode();
|
||||
assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId()));
|
||||
}
|
||||
|
||||
logger.info("--> preferred master is {}", preferredMaster);
|
||||
final Set<String> nonPreferredNodes = new HashSet<>(nodes);
|
||||
nonPreferredNodes.remove(preferredMasterName);
|
||||
final ServiceDisruptionScheme isolatePreferredMaster =
|
||||
new NetworkDisruption(
|
||||
new NetworkDisruption.TwoPartitions(
|
||||
Collections.singleton(preferredMasterName), nonPreferredNodes),
|
||||
new NetworkDisconnect());
|
||||
internalCluster().setDisruptionScheme(isolatePreferredMaster);
|
||||
isolatePreferredMaster.startDisrupting();
|
||||
|
||||
assertAcked(client(randomFrom(nonPreferredNodes)).admin().indices().prepareCreate("test").setSettings(
|
||||
INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1,
|
||||
INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0
|
||||
));
|
||||
|
||||
internalCluster().clearDisruptionScheme(false);
|
||||
internalCluster().setDisruptionScheme(isolateAllNodes);
|
||||
|
||||
logger.info("--> forcing a complete election again");
|
||||
isolateAllNodes.startDisrupting();
|
||||
for (String node: nodes) {
|
||||
assertNoMaster(node);
|
||||
}
|
||||
|
||||
isolateAllNodes.stopDisrupting();
|
||||
|
||||
final ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
if (state.metaData().hasIndex("test") == false) {
|
||||
fail("index 'test' was lost. current cluster state: " + state.prettyPrint());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) {
|
||||
final NetworkLinkDisruptionType disruptionType;
|
||||
if (randomBoolean()) {
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService.MasterCandidate;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -31,6 +31,10 @@ import java.util.Collections;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class ElectMasterServiceTests extends ESTestCase {
|
||||
|
||||
|
@ -55,6 +59,22 @@ public class ElectMasterServiceTests extends ESTestCase {
|
|||
return nodes;
|
||||
}
|
||||
|
||||
List<MasterCandidate> generateRandomCandidates() {
|
||||
int count = scaledRandomIntBetween(1, 100);
|
||||
ArrayList<MasterCandidate> candidates = new ArrayList<>(count);
|
||||
for (int i = 0; i < count; i++) {
|
||||
Set<DiscoveryNode.Role> roles = new HashSet<>();
|
||||
roles.add(DiscoveryNode.Role.MASTER);
|
||||
DiscoveryNode node = new DiscoveryNode("n_" + i, "n_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(),
|
||||
roles, Version.CURRENT);
|
||||
candidates.add(new MasterCandidate(node, randomBoolean() ? MasterCandidate.UNRECOVERED_CLUSTER_VERSION : randomPositiveLong()));
|
||||
}
|
||||
|
||||
Collections.shuffle(candidates, random());
|
||||
return candidates;
|
||||
}
|
||||
|
||||
|
||||
public void testSortByMasterLikelihood() {
|
||||
List<DiscoveryNode> nodes = generateRandomNodes();
|
||||
List<DiscoveryNode> sortedNodes = electMasterService().sortByMasterLikelihood(nodes);
|
||||
|
@ -69,36 +89,53 @@ public class ElectMasterServiceTests extends ESTestCase {
|
|||
}
|
||||
prevNode = node;
|
||||
}
|
||||
}
|
||||
|
||||
public void testTieBreakActiveMasters() {
|
||||
List<DiscoveryNode> nodes = generateRandomCandidates().stream().map(MasterCandidate::getNode).collect(Collectors.toList());
|
||||
DiscoveryNode bestMaster = electMasterService().tieBreakActiveMasters(nodes);
|
||||
for (DiscoveryNode node: nodes) {
|
||||
if (node.equals(bestMaster) == false) {
|
||||
assertTrue(bestMaster.getId().compareTo(node.getId()) < 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testHasEnoughNodes() {
|
||||
List<DiscoveryNode> nodes = rarely() ? Collections.emptyList() : generateRandomNodes();
|
||||
ElectMasterService service = electMasterService();
|
||||
int masterNodes = (int) nodes.stream().filter(DiscoveryNode::isMasterNode).count();
|
||||
service.minimumMasterNodes(randomIntBetween(-1, masterNodes));
|
||||
assertThat(service.hasEnoughMasterNodes(nodes), equalTo(masterNodes > 0));
|
||||
service.minimumMasterNodes(masterNodes + 1 + randomIntBetween(0, nodes.size()));
|
||||
assertFalse(service.hasEnoughMasterNodes(nodes));
|
||||
}
|
||||
|
||||
public void testHasEnoughCandidates() {
|
||||
List<MasterCandidate> candidates = rarely() ? Collections.emptyList() : generateRandomCandidates();
|
||||
ElectMasterService service = electMasterService();
|
||||
service.minimumMasterNodes(randomIntBetween(-1, candidates.size()));
|
||||
assertThat(service.hasEnoughCandidates(candidates), equalTo(candidates.size() > 0));
|
||||
service.minimumMasterNodes(candidates.size() + 1 + randomIntBetween(0, candidates.size()));
|
||||
assertFalse(service.hasEnoughCandidates(candidates));
|
||||
}
|
||||
|
||||
public void testElectMaster() {
|
||||
List<DiscoveryNode> nodes = generateRandomNodes();
|
||||
List<MasterCandidate> candidates = generateRandomCandidates();
|
||||
ElectMasterService service = electMasterService();
|
||||
int min_master_nodes = randomIntBetween(0, nodes.size());
|
||||
service.minimumMasterNodes(min_master_nodes);
|
||||
|
||||
int master_nodes = 0;
|
||||
for (DiscoveryNode node : nodes) {
|
||||
if (node.isMasterNode()) {
|
||||
master_nodes++;
|
||||
}
|
||||
}
|
||||
DiscoveryNode master = null;
|
||||
if (service.hasEnoughMasterNodes(nodes)) {
|
||||
master = service.electMaster(nodes);
|
||||
}
|
||||
|
||||
if (master_nodes == 0) {
|
||||
assertNull(master);
|
||||
} else if (min_master_nodes > 0 && master_nodes < min_master_nodes) {
|
||||
assertNull(master);
|
||||
} else {
|
||||
int minMasterNodes = randomIntBetween(0, candidates.size());
|
||||
service.minimumMasterNodes(minMasterNodes);
|
||||
MasterCandidate master = service.electMaster(candidates);
|
||||
assertNotNull(master);
|
||||
for (DiscoveryNode node : nodes) {
|
||||
if (node.isMasterNode()) {
|
||||
assertTrue(master.getId().compareTo(node.getId()) <= 0);
|
||||
}
|
||||
for (MasterCandidate candidate : candidates) {
|
||||
if (candidate.getNode().equals(master.getNode())) {
|
||||
// nothing much to test here
|
||||
} else if (candidate.getClusterStateVersion() == master.getClusterStateVersion()) {
|
||||
assertThat("candidate " + candidate + " has a lower or equal id than master " + master, candidate.getNode().getId(),
|
||||
greaterThan(master.getNode().getId()));
|
||||
} else {
|
||||
assertThat("candidate " + master + " has a higher cluster state version than candidate " + candidate,
|
||||
master.getClusterStateVersion(), greaterThan(candidate.getClusterStateVersion()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.BaseFuture;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
|
|
@ -34,14 +34,12 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryStats;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
||||
|
@ -60,10 +58,8 @@ import org.hamcrest.Matchers;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -77,8 +73,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
|
||||
@ESIntegTestCase.SuppressLocalMode
|
||||
|
@ -293,44 +287,6 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testHandleNodeJoin_incompatibleMinVersion() throws UnknownHostException {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put("discovery.type", "zen") // <-- To override the local setting if set externally
|
||||
.build();
|
||||
String nodeName = internalCluster().startNode(nodeSettings);
|
||||
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName);
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
|
||||
DiscoveryNode node = new DiscoveryNode("_node_id", new InetSocketTransportAddress(InetAddress.getByName("0.0.0.0"), 0),
|
||||
emptyMap(), emptySet(), previousMajorVersion);
|
||||
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
|
||||
zenDiscovery.handleJoinRequest(node, clusterService.state(), new MembershipAction.JoinCallback() {
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
holder.set((IllegalStateException) e);
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(holder.get(), notNullValue());
|
||||
assertThat(holder.get().getMessage(), equalTo("Can't handle join request from a node with a version [" + previousMajorVersion
|
||||
+ "] that is lower than the minimum compatible version [" + Version.CURRENT.minimumCompatibilityVersion() + "]"));
|
||||
}
|
||||
|
||||
public void testJoinElectedMaster_incompatibleMinVersion() {
|
||||
ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY);
|
||||
|
||||
DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), emptyMap(),
|
||||
Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT);
|
||||
assertThat(electMasterService.electMaster(Collections.singletonList(node)), sameInstance(node));
|
||||
node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), emptyMap(), emptySet(), previousMajorVersion);
|
||||
assertThat("Can't join master because version " + previousMajorVersion
|
||||
+ " is lower than the minimum compatable version " + Version.CURRENT + " can support",
|
||||
electMasterService.electMaster(Collections.singletonList(node)), nullValue());
|
||||
}
|
||||
|
||||
public void testDiscoveryStats() throws IOException {
|
||||
String expectedStatsJsonResponse = "{\n" +
|
||||
" \"discovery\" : {\n" +
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener;
|
||||
|
@ -55,8 +54,8 @@ import java.util.stream.Collectors;
|
|||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
||||
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
|
||||
import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -128,7 +127,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
|||
Set<Role> roles = new HashSet<>(randomSubsetOf(Arrays.asList(Role.values())));
|
||||
DiscoveryNode node = new DiscoveryNode("node_" + i, "id_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(),
|
||||
roles, Version.CURRENT);
|
||||
responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomBoolean()));
|
||||
responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomLong()));
|
||||
allNodes.add(node);
|
||||
if (node.isMasterNode()) {
|
||||
masterNodes.add(node);
|
||||
|
@ -136,8 +135,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
|||
}
|
||||
|
||||
boolean ignore = randomBoolean();
|
||||
List<ZenPing.PingResponse> filtered = ZenDiscovery.filterPingResponses(
|
||||
responses.toArray(new ZenPing.PingResponse[responses.size()]), ignore, logger);
|
||||
List<ZenPing.PingResponse> filtered = ZenDiscovery.filterPingResponses(responses, ignore, logger);
|
||||
final List<DiscoveryNode> filteredNodes = filtered.stream().map(ZenPing.PingResponse::node).collect(Collectors.toList());
|
||||
if (ignore) {
|
||||
assertThat(filteredNodes, equalTo(masterNodes));
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
|
@ -39,7 +40,7 @@ public class ZenPingTests extends ESTestCase {
|
|||
DiscoveryNode[] nodes = new DiscoveryNode[randomIntBetween(1, 30)];
|
||||
long maxIdPerNode[] = new long[nodes.length];
|
||||
DiscoveryNode masterPerNode[] = new DiscoveryNode[nodes.length];
|
||||
boolean hasJoinedOncePerNode[] = new boolean[nodes.length];
|
||||
long clusterStateVersionPerNode[] = new long[nodes.length];
|
||||
ArrayList<ZenPing.PingResponse> pings = new ArrayList<>();
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
nodes[i] = new DiscoveryNode("" + i, LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
|
@ -51,9 +52,9 @@ public class ZenPingTests extends ESTestCase {
|
|||
if (randomBoolean()) {
|
||||
masterNode = nodes[randomInt(nodes.length - 1)];
|
||||
}
|
||||
boolean hasJoinedOnce = randomBoolean();
|
||||
long clusterStateVersion = randomLong();
|
||||
ZenPing.PingResponse ping = new ZenPing.PingResponse(nodes[node], masterNode, ClusterName.CLUSTER_NAME_SETTING.
|
||||
getDefault(Settings.EMPTY), hasJoinedOnce);
|
||||
getDefault(Settings.EMPTY), clusterStateVersion);
|
||||
if (rarely()) {
|
||||
// ignore some pings
|
||||
continue;
|
||||
|
@ -61,7 +62,7 @@ public class ZenPingTests extends ESTestCase {
|
|||
// update max ping info
|
||||
maxIdPerNode[node] = ping.id();
|
||||
masterPerNode[node] = masterNode;
|
||||
hasJoinedOncePerNode[node] = hasJoinedOnce;
|
||||
clusterStateVersionPerNode[node] = clusterStateVersion;
|
||||
pings.add(ping);
|
||||
}
|
||||
|
||||
|
@ -69,15 +70,15 @@ public class ZenPingTests extends ESTestCase {
|
|||
Collections.shuffle(pings, random());
|
||||
|
||||
ZenPing.PingCollection collection = new ZenPing.PingCollection();
|
||||
collection.addPings(pings.toArray(new ZenPing.PingResponse[pings.size()]));
|
||||
collection.addPings(pings);
|
||||
|
||||
ZenPing.PingResponse[] aggregate = collection.toArray();
|
||||
List<ZenPing.PingResponse> aggregate = collection.toList();
|
||||
|
||||
for (ZenPing.PingResponse ping : aggregate) {
|
||||
int nodeId = Integer.parseInt(ping.node().getId());
|
||||
assertThat(maxIdPerNode[nodeId], equalTo(ping.id()));
|
||||
assertThat(masterPerNode[nodeId], equalTo(ping.master()));
|
||||
assertThat(hasJoinedOncePerNode[nodeId], equalTo(ping.hasJoinedOnce()));
|
||||
assertThat(clusterStateVersionPerNode[nodeId], equalTo(ping.getClusterStateVersion()));
|
||||
|
||||
maxIdPerNode[nodeId] = -1; // mark as seen
|
||||
}
|
||||
|
|
|
@ -20,6 +20,9 @@
|
|||
package org.elasticsearch.discovery.zen.ping.unicast;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
|
@ -31,7 +34,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
|
@ -45,16 +48,18 @@ import org.elasticsearch.transport.TransportService;
|
|||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class UnicastZenPingIT extends ESTestCase {
|
||||
public class UnicastZenPingTests extends ESTestCase {
|
||||
public void testSimplePings() throws InterruptedException {
|
||||
int startPort = 11000 + randomIntBetween(0, 1000);
|
||||
int endPort = startPort + 10;
|
||||
|
@ -78,6 +83,8 @@ public class UnicastZenPingIT extends ESTestCase {
|
|||
Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion);
|
||||
NetworkHandle handleD = startServices(settingsMismatch, threadPool, networkService, "UZP_D", versionD);
|
||||
|
||||
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();
|
||||
|
||||
Settings hostsSettings = Settings.builder()
|
||||
.putArray("discovery.zen.ping.unicast.hosts",
|
||||
NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())),
|
||||
|
@ -96,8 +103,8 @@ public class UnicastZenPingIT extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return false;
|
||||
public ClusterState clusterState() {
|
||||
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
|
||||
}
|
||||
});
|
||||
zenPingA.start();
|
||||
|
@ -110,8 +117,8 @@ public class UnicastZenPingIT extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return true;
|
||||
public ClusterState clusterState() {
|
||||
return state;
|
||||
}
|
||||
});
|
||||
zenPingB.start();
|
||||
|
@ -130,8 +137,8 @@ public class UnicastZenPingIT extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return false;
|
||||
public ClusterState clusterState() {
|
||||
return state;
|
||||
}
|
||||
});
|
||||
zenPingC.start();
|
||||
|
@ -144,36 +151,38 @@ public class UnicastZenPingIT extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return false;
|
||||
public ClusterState clusterState() {
|
||||
return state;
|
||||
}
|
||||
});
|
||||
zenPingD.start();
|
||||
|
||||
try {
|
||||
logger.info("ping from UZP_A");
|
||||
ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
|
||||
assertThat(pingResponses.length, equalTo(1));
|
||||
assertThat(pingResponses[0].node().getId(), equalTo("UZP_B"));
|
||||
assertTrue(pingResponses[0].hasJoinedOnce());
|
||||
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ZenPing.PingResponse ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_B"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(state.version()));
|
||||
assertCounters(handleA, handleA, handleB, handleC, handleD);
|
||||
|
||||
// ping again, this time from B,
|
||||
logger.info("ping from UZP_B");
|
||||
pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1));
|
||||
assertThat(pingResponses.length, equalTo(1));
|
||||
assertThat(pingResponses[0].node().getId(), equalTo("UZP_A"));
|
||||
assertFalse(pingResponses[0].hasJoinedOnce());
|
||||
assertThat(pingResponses.size(), equalTo(1));
|
||||
ping = pingResponses.iterator().next();
|
||||
assertThat(ping.node().getId(), equalTo("UZP_A"));
|
||||
assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION));
|
||||
assertCounters(handleB, handleA, handleB, handleC, handleD);
|
||||
|
||||
logger.info("ping from UZP_C");
|
||||
pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1));
|
||||
assertThat(pingResponses.length, equalTo(0));
|
||||
assertThat(pingResponses.size(), equalTo(0));
|
||||
assertCounters(handleC, handleA, handleB, handleC, handleD);
|
||||
|
||||
logger.info("ping from UZP_D");
|
||||
pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1));
|
||||
assertThat(pingResponses.length, equalTo(0));
|
||||
assertThat(pingResponses.size(), equalTo(0));
|
||||
assertCounters(handleD, handleA, handleB, handleC, handleD);
|
||||
} finally {
|
||||
zenPingA.close();
|
|
@ -38,7 +38,7 @@ import org.elasticsearch.common.logging.Loggers;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
|
|
@ -44,8 +44,8 @@ import org.elasticsearch.common.Priority;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||
|
|
|
@ -63,6 +63,22 @@ to create new scenarios. We have currently ported all published Jepsen scenarios
|
|||
framework. As the Jepsen tests evolve, we will continue porting new scenarios that are not covered yet. We are committed to investigating
|
||||
all new scenarios and will report issues that we find on this page and in our GitHub repository.
|
||||
|
||||
[float]
|
||||
=== Repeated network partitions can cause cluster state updates to be lost (STATUS: ONGOING)
|
||||
|
||||
During a networking partition, cluster state updates (like mapping changes or shard assignments)
|
||||
are committed if a majority of the master-eligible nodes received the update correctly. This means that the current master has access
|
||||
to enough nodes in the cluster to continue to operate correctly. When the network partition heals, the isolated nodes catch
|
||||
up with the current state and receive the previously missed changes. However, if a second partition happens while the cluster
|
||||
is still recovering from the previous one *and* the old master falls on the minority side, it may be that a new master is elected
|
||||
which has not yet catch up. If that happens, cluster state updates can be lost.
|
||||
|
||||
This problem is mostly fixed by {GIT}20384[#20384] (v5.0.0), which takes committed cluster state updates into account during master
|
||||
election. This considerably reduces the chance of this rare problem occurring but does not fully mitigate it. If the second partition
|
||||
happens concurrently with a cluster state update and blocks the cluster state commit message from reaching a majority of nodes, it may be
|
||||
that the in flight update will be lost. If the now-isolated master can still acknowledge the cluster state update to the client this
|
||||
will amount to the loss of an acknowledged change. Fixing that last scenario needs considerable work and is currently targeted at (v6.0.0).
|
||||
|
||||
[float]
|
||||
=== Better request retry mechanism when nodes are disconnected (STATUS: ONGOING)
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
|
|||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
|
|
|
@ -1660,10 +1660,18 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
public void clearDisruptionScheme() {
|
||||
clearDisruptionScheme(true);
|
||||
}
|
||||
|
||||
public void clearDisruptionScheme(boolean ensureHealthyCluster) {
|
||||
if (activeDisruptionScheme != null) {
|
||||
TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal();
|
||||
logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime);
|
||||
if (ensureHealthyCluster) {
|
||||
activeDisruptionScheme.removeAndEnsureHealthy(this);
|
||||
} else {
|
||||
activeDisruptionScheme.removeFromCluster(this);
|
||||
}
|
||||
}
|
||||
activeDisruptionScheme = null;
|
||||
}
|
||||
|
|
|
@ -328,6 +328,18 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
|
|||
}
|
||||
}
|
||||
|
||||
public static class IsolateAllNodes extends DisruptedLinks {
|
||||
|
||||
public IsolateAllNodes(Set<String> nodes) {
|
||||
super(nodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean disrupt(String node1, String node2) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract class representing various types of network disruptions. Instances of this class override the {@link #applyDisruption}
|
||||
* method to apply their specific disruption type to requests that are send from a source to a target node.
|
||||
|
|
|
@ -56,6 +56,21 @@ public class NetworkDisruptionTests extends ESTestCase {
|
|||
assertTrue(topology.getMajoritySide().size() >= topology.getMinoritySide().size());
|
||||
}
|
||||
|
||||
public void testIsolateAll() {
|
||||
Set<String> nodes = generateRandomStringSet(1, 10);
|
||||
NetworkDisruption.DisruptedLinks topology = new NetworkDisruption.IsolateAllNodes(nodes);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
final String node1 = randomFrom(nodes);
|
||||
final String node2 = randomFrom(nodes);
|
||||
if (node1.equals(node2)) {
|
||||
continue;
|
||||
}
|
||||
assertTrue(topology.nodes().contains(node1));
|
||||
assertTrue(topology.nodes().contains(node2));
|
||||
assertTrue(topology.disrupt(node1, node2));
|
||||
}
|
||||
}
|
||||
|
||||
public void testBridge() {
|
||||
Set<String> partition1 = generateRandomStringSet(1, 10);
|
||||
Set<String> partition2 = generateRandomStringSet(1, 10);
|
||||
|
|
Loading…
Reference in New Issue