Discovery: Give a unique id to each ping response

During discovery a node gossips with other nodes to discover the current state of the cluster - what nodes are out there, what version they use and most importantly whether there is an active master out there. During this ping process we may end up in a situation where old information is mixed with new. This is comment if a couple of master election happen in rapid succession.

This commit adds a monotonically increasing id to each ping response. This makes it easy to always select the last ping from every node.

Closes #7769
This commit is contained in:
Boaz Leskes 2014-09-12 14:01:41 +02:00
parent afcbffbfc1
commit 41fd5d02f4
5 changed files with 173 additions and 50 deletions

View File

@ -30,6 +30,9 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.cluster.ClusterName.readClusterName;
import static org.elasticsearch.cluster.node.DiscoveryNode.readNode;
@ -52,6 +55,12 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
public static final PingResponse[] EMPTY = new PingResponse[0];
private static final AtomicLong idGenerator = new AtomicLong();
// an always increasing unique identifier for this ping response.
// lower values means older pings.
private long id;
private ClusterName clusterName;
private DiscoveryNode node;
@ -70,12 +79,21 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
* @param hasJoinedOnce true if the joined has successfully joined the cluster before
*/
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, boolean hasJoinedOnce) {
this.id = idGenerator.incrementAndGet();
this.node = node;
this.master = master;
this.clusterName = clusterName;
this.hasJoinedOnce = hasJoinedOnce;
}
/**
* an always increasing unique identifier for this ping response.
* lower values means older pings.
*/
public long id() {
return this.id;
}
public ClusterName clusterName() {
return this.clusterName;
}
@ -108,16 +126,8 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
if (in.readBoolean()) {
master = readNode(in);
}
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
this.hasJoinedOnce = in.readBoolean();
} else {
// As of 1.4.0 we prefer to elect nodes which have previously successfully joined the cluster.
// Nodes before 1.4.0 do not take this into consideration. If pre<1.4.0 node elects it self as master
// based on the pings, we need to make sure we do the same. We therefore can not demote it
// and thus mark it as if it has previously joined.
this.hasJoinedOnce = true;
}
this.id = in.readLong();
}
@Override
@ -130,14 +140,55 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
out.writeBoolean(true);
master.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
out.writeBoolean(hasJoinedOnce);
}
out.writeLong(id);
}
@Override
public String toString() {
return "ping_response{node [" + node + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}";
return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}";
}
}
/**
* a utility collection of pings where only the most recent ping is stored per node
*/
public static class PingCollection {
Map<DiscoveryNode, PingResponse> pings;
public PingCollection() {
pings = new HashMap<>();
}
/**
* adds a ping if newer than previous pings from the same node
*
* @return true if added, false o.w.
*/
public synchronized boolean addPing(PingResponse ping) {
PingResponse existingResponse = pings.get(ping.node());
// in case both existing and new ping have the same id (probably because they come
// from nodes from version <1.4.0) we prefer to use the last added one.
if (existingResponse == null || existingResponse.id() <= ping.id()) {
pings.put(ping.node(), ping);
return true;
}
return false;
}
/** adds multiple pings if newer than previous pings from the same node */
public synchronized void addPings(PingResponse[] pings) {
for (PingResponse ping : pings) {
addPing(ping);
}
}
/** serialize current pings to an array */
public synchronized PingResponse[] toArray() {
return pings.values().toArray(new PingResponse[pings.size()]);
}
}
}

View File

@ -24,14 +24,12 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
@ -41,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -160,7 +157,7 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
private final AtomicInteger counter;
private ConcurrentMap<DiscoveryNode, PingResponse> responses = ConcurrentCollections.newConcurrentMap();
private PingCollection responses = new PingCollection();
private CompoundPingListener(PingListener listener, ImmutableList<? extends ZenPing> zenPings) {
this.listener = listener;
@ -170,12 +167,10 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
@Override
public void onPing(PingResponse[] pings) {
if (pings != null) {
for (PingResponse pingResponse : pings) {
responses.put(pingResponse.node(), pingResponse);
}
responses.addPings(pings);
}
if (counter.decrementAndGet() == 0) {
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
listener.onPing(responses.toArray());
}
}
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.network.MulticastChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -48,7 +47,6 @@ import org.elasticsearch.transport.*;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -84,7 +82,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private volatile MulticastChannel multicastChannel;
private final AtomicInteger pingIdGenerator = new AtomicInteger();
private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap();
private final Map<Integer, PingCollection> receivedResponses = newConcurrentMap();
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
@ -185,7 +183,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
return;
}
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, ConcurrentCollections.<DiscoveryNode, PingResponse>newConcurrentMap());
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
@ -202,8 +200,8 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
PingCollection responses = receivedResponses.remove(id);
listener.onPing(responses.toArray());
}
});
}
@ -247,11 +245,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
if (logger.isTraceEnabled()) {
logger.trace("[{}] received {}", request.id, request.pingResponse);
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(request.id);
PingCollection responses = receivedResponses.get(request.id);
if (responses == null) {
logger.warn("received ping response {} with no matching id [{}]", request.pingResponse, request.id);
} else {
responses.put(request.pingResponse.node(), request.pingResponse);
responses.addPing(request.pingResponse);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -75,7 +75,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private volatile PingContextProvider contextProvider;
private final AtomicInteger pingIdGenerator = new AtomicInteger();
private final AtomicInteger pingHandlerIdGenerator = new AtomicInteger();
// used to generate unique ids for nodes/address we temporarily connect to
private final AtomicInteger unicastNodeIdGenerator = new AtomicInteger();
@ -83,7 +83,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// used as a node id prefix for nodes/address we temporarily connect to
private static final String UNICAST_NODE_PREFIX = "#zen_unicast_";
private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap();
private final Map<Integer, PingCollection> receivedResponses = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();
@ -183,8 +183,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override
public void ping(final PingListener listener, final TimeValue timeout) throws ElasticsearchException {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), ConcurrentCollections.<DiscoveryNode, PingResponse>newConcurrentMap());
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), new PingCollection());
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
@Override
@ -196,13 +196,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public void run() {
try {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
PingCollection responses = receivedResponses.remove(sendPingsHandler.id());
sendPingsHandler.close();
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
listener.onPing(responses.toArray());
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
}
@ -397,22 +397,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), pingResponse.clusterName().value());
continue;
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(response.id);
PingCollection responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("received ping response {} with no matching id [{}]", pingResponse, response.id);
logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id);
} else {
PingResponse existingResponse = responses.get(pingResponse.node());
if (existingResponse == null) {
responses.put(pingResponse.node(), pingResponse);
} else {
// try and merge the best ping response for it, i.e. if the new one
// doesn't have the master node set, and the existing one does, then
// the existing one is better, so we keep it
// if both have a master or both have none, we prefer the latest ping
if (existingResponse.master() == null || pingResponse.master() != null) {
responses.put(pingResponse.node(), pingResponse);
}
}
responses.addPing(pingResponse);
}
}
} finally {

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class ZenPingTests extends ElasticsearchTestCase {
@Test
public void testPingCollection() {
DiscoveryNode[] nodes = new DiscoveryNode[randomIntBetween(1, 30)];
long maxIdPerNode[] = new long[nodes.length];
DiscoveryNode masterPerNode[] = new DiscoveryNode[nodes.length];
boolean hasJoinedOncePerNode[] = new boolean[nodes.length];
ArrayList<ZenPing.PingResponse> pings = new ArrayList<>();
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new DiscoveryNode("" + i, DummyTransportAddress.INSTANCE, Version.CURRENT);
}
for (int pingCount = scaledRandomIntBetween(10, nodes.length * 10); pingCount > 0; pingCount--) {
int node = randomInt(nodes.length - 1);
DiscoveryNode masterNode = null;
if (randomBoolean()) {
masterNode = nodes[randomInt(nodes.length - 1)];
}
boolean hasJoinedOnce = randomBoolean();
ZenPing.PingResponse ping = new ZenPing.PingResponse(nodes[node], masterNode, ClusterName.DEFAULT, hasJoinedOnce);
if (rarely()) {
// ignore some pings
continue;
}
// update max ping info
maxIdPerNode[node] = ping.id();
masterPerNode[node] = masterNode;
hasJoinedOncePerNode[node] = hasJoinedOnce;
pings.add(ping);
}
// shuffle
Collections.shuffle(pings);
ZenPing.PingCollection collection = new ZenPing.PingCollection();
collection.addPings(pings.toArray(new ZenPing.PingResponse[pings.size()]));
ZenPing.PingResponse[] aggregate = collection.toArray();
for (ZenPing.PingResponse ping : aggregate) {
int nodeId = Integer.parseInt(ping.node().id());
assertThat(maxIdPerNode[nodeId], equalTo(ping.id()));
assertThat(masterPerNode[nodeId], equalTo(ping.master()));
assertThat(hasJoinedOncePerNode[nodeId], equalTo(ping.hasJoinedOnce()));
maxIdPerNode[nodeId] = -1; // mark as seen
}
for (int i = 0; i < maxIdPerNode.length; i++) {
assertTrue("node " + i + " had pings but it was not found in collection", maxIdPerNode[i] <= 0);
}
}
}