Resiliency: Master election should demotes nodes which try to join the cluster for the first time

With the change in #7493,  we introduced a pinging round when a master nodes goes down. That pinging round helps validating the current state of the cluster and takes, by default, 3 seconds. It may be that during that window, a new node tries to join the cluster and starts pinging (this is typical when you quickly restart the current master).  If this node gets elected as the new master it will force recovery from the gateway (it has no in memory cluster state), which in turn will cause a full cluster shard synchronisation. While this is not a problem on it's own, it's a shame. This commit demotes "new" nodes during master election so the will only be elected if really needed.

Closes #7558
This commit is contained in:
Boaz Leskes 2014-09-03 11:40:47 +02:00
parent 4ed52073fa
commit a50934ea3e
10 changed files with 233 additions and 86 deletions

View File

@ -52,6 +52,7 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
@ -69,6 +70,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@ -76,7 +78,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
*
*/
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, PingContextProvider {
public final static String SETTING_REJOIN_ON_MASTER_GONE = "discovery.zen.rejoin_on_master_gone";
public final static String SETTING_PING_TIMEOUT = "discovery.zen.ping.timeout";
@ -139,6 +141,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private volatile boolean rejoinOnMasterGone;
/** counts the time this node has joined the cluster or have elected it self as master */
private final AtomicLong clusterJoinsCounter = new AtomicLong();
@Nullable
private NodeService nodeService;
@ -194,7 +199,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.nodesFD.addListener(new NodeFaultDetectionListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName);
this.pingService.setNodesProvider(this);
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
transportService.registerHandler(DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequestHandler());
@ -290,6 +295,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return clusterName.value() + "/" + localNode.id();
}
/** start of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
@Override
public DiscoveryNodes nodes() {
DiscoveryNodes latestNodes = this.latestDiscoNodes;
@ -305,6 +311,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return this.nodeService;
}
@Override
public boolean nodeHasJoinedClusterOnce() {
return clusterJoinsCounter.get() > 0;
}
/** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
@Override
public void publish(ClusterState clusterState, AckListener ackListener) {
if (!master) {
@ -387,6 +401,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (elected as master)", count);
}
});
} else {
@ -404,8 +420,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
masterFD.start(masterNode, "initial_join");
// no need to submit the received cluster state, we will get it from the master when it publishes
// the fact that we joined
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (joined master)", count);
}
}
}
@ -922,7 +938,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append("target [").append(pingResponse.target()).append("], master [").append(pingResponse.master()).append("]");
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace(sb.toString());
@ -931,7 +947,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// filter responses
List<ZenPing.PingResponse> pingResponses = Lists.newArrayList();
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
DiscoveryNode node = pingResponse.target();
DiscoveryNode node = pingResponse.node();
if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
// filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
} else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
@ -947,7 +963,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : pingResponses) {
sb.append("\n\t--> ").append("target [").append(pingResponse.target()).append("], master [").append(pingResponse.master()).append("]");
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.debug(sb.toString());
@ -963,20 +979,38 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
Set<DiscoveryNode> possibleMasterNodes = Sets.newHashSet();
// nodes discovered during pinging
Set<DiscoveryNode> activeNodes = Sets.newHashSet();
// nodes discovered who has previously been part of the cluster and do not ping for the very first time
Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
if (localNode.masterNode()) {
possibleMasterNodes.add(localNode);
activeNodes.add(localNode);
long joinsCounter = clusterJoinsCounter.get();
if (joinsCounter > 0) {
logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
joinedOnceActiveNodes.add(localNode);
}
}
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
activeNodes.add(pingResponse.node());
if (pingResponse.hasJoinedOnce()) {
joinedOnceActiveNodes.add(pingResponse.node());
}
}
if (pingMasters.isEmpty()) {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
if (electMaster.hasEnoughMasterNodes(possibleMasterNodes)) {
return electMaster.electMaster(possibleMasterNodes);
if (electMaster.hasEnoughMasterNodes(activeNodes)) {
// we give preference to nodes who have previously already joined the cluster. Those will
// have a cluster state in memory, including an up to date routing table (which is not persistent to disk
// by the gateway)
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes);
if (master != null) {
return master;
}
return electMaster.electMaster(activeNodes);
} else {
logger.trace("not enough master nodes [{}]", possibleMasterNodes);
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", activeNodes);
return null;
}
} else {

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
/**
*
*/
public interface PingContextProvider extends DiscoveryNodesProvider {
/** return true if this node has previously joined the cluster at least once. False if this is first join */
boolean nodeHasJoinedClusterOnce();
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.zen.ping;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
@ -27,7 +28,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import java.io.IOException;
@ -39,7 +39,7 @@ import static org.elasticsearch.cluster.node.DiscoveryNode.readNode;
*/
public interface ZenPing extends LifecycleComponent<ZenPing> {
void setNodesProvider(DiscoveryNodesProvider nodesProvider);
void setPingContextProvider(PingContextProvider contextProvider);
void ping(PingListener listener, TimeValue timeout) throws ElasticsearchException;
@ -49,36 +49,52 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
}
public static class PingResponse implements Streamable {
public static final PingResponse[] EMPTY = new PingResponse[0];
private ClusterName clusterName;
private DiscoveryNode target;
private DiscoveryNode node;
private DiscoveryNode master;
private boolean hasJoinedOnce;
private PingResponse() {
}
public PingResponse(DiscoveryNode target, DiscoveryNode master, ClusterName clusterName) {
this.target = target;
/**
* @param node the node which this ping describes
* @param master the current master of the node
* @param clusterName the cluster name of the node
* @param hasJoinedOnce true if the joined has successfully joined the cluster before
*/
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, boolean hasJoinedOnce) {
this.node = node;
this.master = master;
this.clusterName = clusterName;
this.hasJoinedOnce = hasJoinedOnce;
}
public ClusterName clusterName() {
return this.clusterName;
}
public DiscoveryNode target() {
return target;
/** the node which this ping describes */
public DiscoveryNode node() {
return node;
}
/** the current master of the node */
public DiscoveryNode master() {
return master;
}
/** true if the joined has successfully joined the cluster before */
public boolean hasJoinedOnce() {
return hasJoinedOnce;
}
public static PingResponse readPingResponse(StreamInput in) throws IOException {
PingResponse response = new PingResponse();
response.readFrom(in);
@ -88,27 +104,40 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
@Override
public void readFrom(StreamInput in) throws IOException {
clusterName = readClusterName(in);
target = readNode(in);
node = readNode(in);
if (in.readBoolean()) {
master = readNode(in);
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.hasJoinedOnce = in.readBoolean();
} else {
// As of 1.4.0 we prefer to elect nodes which have previously successfully joined the cluster.
// Nodes before 1.4.0 do not take this into consideration. If pre<1.4.0 node elects it self as master
// based on the pings, we need to make sure we do the same. We therefore can not demote it
// and thus mark it as if it has previously joined.
this.hasJoinedOnce = true;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
target.writeTo(out);
node.writeTo(out);
if (master == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
master.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(hasJoinedOnce);
}
}
@Override
public String toString() {
return "ping_response{target [" + target + "], master [" + master + "], cluster_name[" + clusterName.value() + "]}";
return "ping_response{node [" + node + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}";
}
}
}

View File

@ -33,7 +33,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
@ -92,12 +91,12 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
}
@Override
public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
public void setPingContextProvider(PingContextProvider contextProvider) {
if (lifecycle.started()) {
throw new ElasticsearchIllegalStateException("Can't set nodes provider when started");
}
for (ZenPing zenPing : zenPings) {
zenPing.setNodesProvider(nodesProvider);
zenPing.setPingContextProvider(contextProvider);
}
}
@ -172,7 +171,7 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
public void onPing(PingResponse[] pings) {
if (pings != null) {
for (PingResponse pingResponse : pings) {
responses.put(pingResponse.target(), pingResponse);
responses.put(pingResponse.node(), pingResponse);
}
}
if (counter.decrementAndGet() == 0) {

View File

@ -40,7 +40,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -77,7 +77,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private final ClusterName clusterName;
private final NetworkService networkService;
private final Version version;
private volatile DiscoveryNodesProvider nodesProvider;
private volatile PingContextProvider contextProvider;
private final boolean pingEnabled;
@ -112,11 +112,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
@Override
public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
public void setPingContextProvider(PingContextProvider nodesProvider) {
if (lifecycle.started()) {
throw new ElasticsearchIllegalStateException("Can't set nodes provider when started");
}
this.nodesProvider = nodesProvider;
this.contextProvider = nodesProvider;
}
@Override
@ -213,10 +213,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput out = new HandlesStreamOutput(bStream);
out.writeBytes(INTERNAL_HEADER);
// TODO: change to min_required version!
Version.writeVersion(version, out);
out.writeInt(id);
clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out);
contextProvider.nodes().localNode().writeTo(out);
out.close();
multicastChannel.send(bStream.bytes());
if (logger.isTraceEnabled()) {
@ -250,7 +251,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
if (responses == null) {
logger.warn("received ping response {} with no matching id [{}]", request.pingResponse, request.id);
} else {
responses.put(request.pingResponse.target(), request.pingResponse);
responses.put(request.pingResponse.node(), request.pingResponse);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@ -361,14 +362,15 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
return;
}
String clusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null;
if (clusterName == null) {
final String requestClusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null;
if (requestClusterName == null) {
logger.warn("malformed external ping request, missing 'cluster_name' element within request, from {}, content {}", remoteAddress, externalPingData);
return;
}
if (!clusterName.equals(MulticastZenPing.this.clusterName.value())) {
logger.trace("got request for cluster_name {}, but our cluster_name is {}, from {}, content {}", clusterName, MulticastZenPing.this.clusterName.value(), remoteAddress, externalPingData);
if (!requestClusterName.equals(clusterName.value())) {
logger.trace("got request for cluster_name {}, but our cluster_name is {}, from {}, content {}",
requestClusterName, clusterName.value(), remoteAddress, externalPingData);
return;
}
if (logger.isTraceEnabled()) {
@ -376,16 +378,16 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
try {
DiscoveryNode localNode = nodesProvider.nodes().localNode();
DiscoveryNode localNode = contextProvider.nodes().localNode();
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.startObject().startObject("response");
builder.field("cluster_name", MulticastZenPing.this.clusterName.value());
builder.field("cluster_name", clusterName.value());
builder.startObject("version").field("number", version.number()).field("snapshot_build", version.snapshot).endObject();
builder.field("transport_address", localNode.address().toString());
if (nodesProvider.nodeService() != null) {
for (Map.Entry<String, String> attr : nodesProvider.nodeService().attributes().entrySet()) {
if (contextProvider.nodeService() != null) {
for (Map.Entry<String, String> attr : contextProvider.nodeService().attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
}
@ -406,32 +408,33 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
}
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName clusterName) {
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
if (!pingEnabled) {
return;
}
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
final DiscoveryNodes discoveryNodes = contextProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// that's me, ignore
return;
}
if (!clusterName.equals(MulticastZenPing.this.clusterName)) {
if (!requestClusterName.equals(clusterName)) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", id, requestingNode, clusterName, MulticastZenPing.this.clusterName);
logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring",
id, requestingNode, requestClusterName.value(), clusterName.value());
}
return;
}
// don't connect between two client nodes, no need for that...
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, clusterName);
logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, requestClusterName);
}
return;
}
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName);
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse);

View File

@ -38,8 +38,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -73,7 +73,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final DiscoveryNode[] configuredTargetNodes;
private volatile DiscoveryNodesProvider nodesProvider;
private volatile PingContextProvider contextProvider;
private final AtomicInteger pingIdGenerator = new AtomicInteger();
@ -147,14 +147,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
@Override
public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
this.nodesProvider = nodesProvider;
public void setPingContextProvider(PingContextProvider contextProvider) {
this.contextProvider = contextProvider;
}
/**
* Clears the list of cached ping responses.
*/
public void clearTemporalReponses() {
public void clearTemporalResponses() {
temporalResponses.clear();
}
@ -245,18 +245,20 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
}
void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = sendPingsHandler.id();
pingRequest.timeout = timeout;
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);
DiscoveryNodes discoNodes = contextProvider.nodes();
pingRequest.pingResponse = createPingResponse(discoNodes);
HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>();
for (PingResponse temporalResponse : temporalResponses) {
// Only send pings to nodes that have the same cluster name.
if (clusterName.equals(temporalResponse.clusterName())) {
nodesToPingSet.add(temporalResponse.target());
nodesToPingSet.add(temporalResponse.node());
}
}
@ -370,30 +372,30 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public void handleResponse(UnicastPingResponse response) {
logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses));
try {
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
DiscoveryNodes discoveryNodes = contextProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) {
if (pingResponse.node().id().equals(discoveryNodes.localNodeId())) {
// that's us, ignore
continue;
}
if (!pingResponse.clusterName().equals(clusterName)) {
// not part of the cluster
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.target(), pingResponse.clusterName().value());
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), pingResponse.clusterName().value());
continue;
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("received ping response {} with no matching id [{}]", pingResponse, response.id);
} else {
PingResponse existingResponse = responses.get(pingResponse.target());
PingResponse existingResponse = responses.get(pingResponse.node());
if (existingResponse == null) {
responses.put(pingResponse.target(), pingResponse);
responses.put(pingResponse.node(), pingResponse);
} else {
// try and merge the best ping response for it, i.e. if the new one
// doesn't have the master node set, and the existing one does, then
// the existing one is better, so we keep it
if (pingResponse.master() != null) {
responses.put(pingResponse.target(), pingResponse);
responses.put(pingResponse.node(), pingResponse);
}
}
}
@ -429,8 +431,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
});
List<PingResponse> pingResponses = newArrayList(temporalResponses);
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingResponses.add(new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName));
pingResponses.add(createPingResponse(contextProvider.nodes()));
UnicastPingResponse unicastPingResponse = new UnicastPingResponse();
@ -486,6 +487,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
}
private PingResponse createPingResponse(DiscoveryNodes discoNodes) {
return new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
}
static class UnicastPingResponse extends TransportResponse {
int id;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
@ -39,7 +40,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes =0)
public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
@ -200,8 +201,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
logger.info("--> start two more nodes");
internalCluster().startNode(settings);
internalCluster().startNode(settings);
internalCluster().startNodesAsync(2, settings).get();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
@ -234,8 +234,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertNoMasterBlockOnAllNodes();
logger.info("--> start back the 2 nodes ");
internalCluster().startNode(settings);
internalCluster().startNode(settings);
String[] newNodes = internalCluster().startNodesAsync(2, settings).get().toArray(Strings.EMPTY_ARRAY);
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
@ -248,6 +247,8 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(4));
// we prefer to elect up and running nodes
assertThat(state.nodes().masterNodeId(), not(isOneOf(newNodes)));
logger.info("--> verify we the data back");
for (int i = 0; i < 10; i++) {

View File

@ -167,7 +167,7 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalReponses();
((UnicastZenPing) zenPing).clearTemporalResponses();
}
}
}
@ -629,7 +629,7 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
// includes all the other nodes that have pinged it and the issue doesn't manifest
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
((UnicastZenPing) zenPing).clearTemporalReponses();
((UnicastZenPing) zenPing).clearTemporalResponses();
}
}
@ -668,7 +668,7 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
// includes all the other nodes that have pinged it and the issue doesn't manifest
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
((UnicastZenPing) zenPing).clearTemporalReponses();
((UnicastZenPing) zenPing).clearTemporalResponses();
}
}

View File

@ -29,7 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -65,15 +65,15 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
settings = buildRandomMulticast(settings);
ThreadPool threadPool = new ThreadPool("testSimplePings");
ClusterName clusterName = new ClusterName("test");
final ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceB.boundAddress().publishAddress(), Version.CURRENT);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
zenPingA.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build();
@ -83,11 +83,16 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
public NodeService nodeService() {
return null;
}
@Override
public boolean nodeHasJoinedClusterOnce() {
return false;
}
});
zenPingA.start();
MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName, Version.CURRENT);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
zenPingB.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().put(nodeB).localNodeId("B").build();
@ -97,13 +102,27 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
public NodeService nodeService() {
return null;
}
@Override
public boolean nodeHasJoinedClusterOnce() {
return true;
}
});
zenPingB.start();
try {
logger.info("ping from A");
ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.length, equalTo(1));
assertThat(pingResponses[0].target().id(), equalTo("B"));
assertThat(pingResponses[0].node().id(), equalTo("B"));
assertTrue(pingResponses[0].hasJoinedOnce());
logger.info("ping from B");
pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.length, equalTo(1));
assertThat(pingResponses[0].node().id(), equalTo("A"));
assertFalse(pingResponses[0].hasJoinedOnce());
} finally {
zenPingA.close();
zenPingB.close();
@ -118,13 +137,13 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
Settings settings = ImmutableSettings.EMPTY;
settings = buildRandomMulticast(settings);
ThreadPool threadPool = new ThreadPool("testExternalPing");
ClusterName clusterName = new ClusterName("test");
final ThreadPool threadPool = new ThreadPool("testExternalPing");
final ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
zenPingA.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build();
@ -134,6 +153,11 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
public NodeService nodeService() {
return null;
}
@Override
public boolean nodeHasJoinedClusterOnce() {
return false;
}
});
zenPingA.start();

View File

@ -29,8 +29,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -76,7 +76,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
.build();
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
zenPingA.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().put(nodeA).localNodeId("UZP_A").build();
@ -86,11 +86,16 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
public NodeService nodeService() {
return null;
}
@Override
public boolean nodeHasJoinedClusterOnce() {
return false;
}
});
zenPingA.start();
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
zenPingB.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().put(nodeB).localNodeId("UZP_B").build();
@ -100,13 +105,28 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
public NodeService nodeService() {
return null;
}
@Override
public boolean nodeHasJoinedClusterOnce() {
return true;
}
});
zenPingB.start();
try {
logger.info("ping from UZP_A");
ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.length, equalTo(1));
assertThat(pingResponses[0].target().id(), equalTo("UZP_B"));
assertThat(pingResponses[0].node().id(), equalTo("UZP_B"));
assertTrue(pingResponses[0].hasJoinedOnce());
// ping again, this time from B,
logger.info("ping from UZP_B");
pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.length, equalTo(1));
assertThat(pingResponses[0].node().id(), equalTo("UZP_A"));
assertFalse(pingResponses[0].hasJoinedOnce());
} finally {
zenPingA.close();
zenPingB.close();