diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java index 5beddfcf727..533a1e101b0 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java @@ -30,6 +30,9 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.cluster.ClusterName.readClusterName; import static org.elasticsearch.cluster.node.DiscoveryNode.readNode; @@ -52,6 +55,12 @@ public interface ZenPing extends LifecycleComponent { public static final PingResponse[] EMPTY = new PingResponse[0]; + private static final AtomicLong idGenerator = new AtomicLong(); + + // an always increasing unique identifier for this ping response. + // lower values means older pings. + private long id; + private ClusterName clusterName; private DiscoveryNode node; @@ -70,12 +79,21 @@ public interface ZenPing extends LifecycleComponent { * @param hasJoinedOnce true if the joined has successfully joined the cluster before */ public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, boolean hasJoinedOnce) { + this.id = idGenerator.incrementAndGet(); this.node = node; this.master = master; this.clusterName = clusterName; this.hasJoinedOnce = hasJoinedOnce; } + /** + * an always increasing unique identifier for this ping response. + * lower values means older pings. + */ + public long id() { + return this.id; + } + public ClusterName clusterName() { return this.clusterName; } @@ -108,16 +126,8 @@ public interface ZenPing extends LifecycleComponent { if (in.readBoolean()) { master = readNode(in); } - if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) { - this.hasJoinedOnce = in.readBoolean(); - } else { - // As of 1.4.0 we prefer to elect nodes which have previously successfully joined the cluster. - // Nodes before 1.4.0 do not take this into consideration. If pre<1.4.0 node elects it self as master - // based on the pings, we need to make sure we do the same. We therefore can not demote it - // and thus mark it as if it has previously joined. - this.hasJoinedOnce = true; - } - + this.hasJoinedOnce = in.readBoolean(); + this.id = in.readLong(); } @Override @@ -130,14 +140,55 @@ public interface ZenPing extends LifecycleComponent { out.writeBoolean(true); master.writeTo(out); } - if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) { - out.writeBoolean(hasJoinedOnce); - } + out.writeBoolean(hasJoinedOnce); + out.writeLong(id); } @Override public String toString() { - return "ping_response{node [" + node + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}"; + return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}"; } } + + + /** + * a utility collection of pings where only the most recent ping is stored per node + */ + public static class PingCollection { + + Map pings; + + public PingCollection() { + pings = new HashMap<>(); + } + + /** + * adds a ping if newer than previous pings from the same node + * + * @return true if added, false o.w. + */ + public synchronized boolean addPing(PingResponse ping) { + PingResponse existingResponse = pings.get(ping.node()); + // in case both existing and new ping have the same id (probably because they come + // from nodes from version <1.4.0) we prefer to use the last added one. + if (existingResponse == null || existingResponse.id() <= ping.id()) { + pings.put(ping.node(), ping); + return true; + } + return false; + } + + /** adds multiple pings if newer than previous pings from the same node */ + public synchronized void addPings(PingResponse[] pings) { + for (PingResponse ping : pings) { + addPing(ping); + } + } + + /** serialize current pings to an array */ + public synchronized PingResponse[] toArray() { + return pings.values().toArray(new PingResponse[pings.size()]); + } + + } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index 6e693454219..ee1fc6dcd0e 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -24,14 +24,12 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; @@ -41,7 +39,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -160,7 +157,7 @@ public class ZenPingService extends AbstractLifecycleComponent implemen private final AtomicInteger counter; - private ConcurrentMap responses = ConcurrentCollections.newConcurrentMap(); + private PingCollection responses = new PingCollection(); private CompoundPingListener(PingListener listener, ImmutableList zenPings) { this.listener = listener; @@ -170,12 +167,10 @@ public class ZenPingService extends AbstractLifecycleComponent implemen @Override public void onPing(PingResponse[] pings) { if (pings != null) { - for (PingResponse pingResponse : pings) { - responses.put(pingResponse.node(), pingResponse); - } + responses.addPings(pings); } if (counter.decrementAndGet() == 0) { - listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); + listener.onPing(responses.toArray()); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 39ca9b9a91a..efb02e17576 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.network.MulticastChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -48,7 +47,6 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.net.SocketAddress; import java.util.Map; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -84,7 +82,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private volatile MulticastChannel multicastChannel; private final AtomicInteger pingIdGenerator = new AtomicInteger(); - private final Map> receivedResponses = newConcurrentMap(); + private final Map receivedResponses = newConcurrentMap(); public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) { this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version); @@ -185,7 +183,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem return; } final int id = pingIdGenerator.incrementAndGet(); - receivedResponses.put(id, ConcurrentCollections.newConcurrentMap()); + receivedResponses.put(id, new PingCollection()); sendPingRequest(id); // try and send another ping request halfway through (just in case someone woke up during it...) // this can be a good trade-off to nailing the initial lookup or un-delivered messages @@ -202,8 +200,8 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() { @Override public void run() { - ConcurrentMap responses = receivedResponses.remove(id); - listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); + PingCollection responses = receivedResponses.remove(id); + listener.onPing(responses.toArray()); } }); } @@ -247,11 +245,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem if (logger.isTraceEnabled()) { logger.trace("[{}] received {}", request.id, request.pingResponse); } - ConcurrentMap responses = receivedResponses.get(request.id); + PingCollection responses = receivedResponses.get(request.id); if (responses == null) { logger.warn("received ping response {} with no matching id [{}]", request.pingResponse, request.id); } else { - responses.put(request.pingResponse.node(), request.pingResponse); + responses.addPing(request.pingResponse); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 158b805fc6a..fbad509b648 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -75,7 +75,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private volatile PingContextProvider contextProvider; - private final AtomicInteger pingIdGenerator = new AtomicInteger(); + private final AtomicInteger pingHandlerIdGenerator = new AtomicInteger(); // used to generate unique ids for nodes/address we temporarily connect to private final AtomicInteger unicastNodeIdGenerator = new AtomicInteger(); @@ -83,7 +83,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // used as a node id prefix for nodes/address we temporarily connect to private static final String UNICAST_NODE_PREFIX = "#zen_unicast_"; - private final Map> receivedResponses = newConcurrentMap(); + private final Map receivedResponses = newConcurrentMap(); // a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes) private final Queue temporalResponses = ConcurrentCollections.newQueue(); @@ -183,8 +183,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen @Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticsearchException { - final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet()); - receivedResponses.put(sendPingsHandler.id(), ConcurrentCollections.newConcurrentMap()); + final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); + receivedResponses.put(sendPingsHandler.id(), new PingCollection()); sendPings(timeout, null, sendPingsHandler); threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() { @Override @@ -196,13 +196,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen public void run() { try { sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler); - ConcurrentMap responses = receivedResponses.remove(sendPingsHandler.id()); + PingCollection responses = receivedResponses.remove(sendPingsHandler.id()); sendPingsHandler.close(); for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); transportService.disconnectFromNode(node); } - listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); + listener.onPing(responses.toArray()); } catch (EsRejectedExecutionException ex) { logger.debug("Ping execution rejected", ex); } @@ -397,22 +397,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), pingResponse.clusterName().value()); continue; } - ConcurrentMap responses = receivedResponses.get(response.id); + PingCollection responses = receivedResponses.get(response.id); if (responses == null) { - logger.warn("received ping response {} with no matching id [{}]", pingResponse, response.id); + logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id); } else { - PingResponse existingResponse = responses.get(pingResponse.node()); - if (existingResponse == null) { - responses.put(pingResponse.node(), pingResponse); - } else { - // try and merge the best ping response for it, i.e. if the new one - // doesn't have the master node set, and the existing one does, then - // the existing one is better, so we keep it - // if both have a master or both have none, we prefer the latest ping - if (existingResponse.master() == null || pingResponse.master() != null) { - responses.put(pingResponse.node(), pingResponse); - } - } + responses.addPing(pingResponse); } } } finally { diff --git a/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java b/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java new file mode 100644 index 00000000000..6ded8a9f950 --- /dev/null +++ b/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class ZenPingTests extends ElasticsearchTestCase { + + @Test + public void testPingCollection() { + DiscoveryNode[] nodes = new DiscoveryNode[randomIntBetween(1, 30)]; + long maxIdPerNode[] = new long[nodes.length]; + DiscoveryNode masterPerNode[] = new DiscoveryNode[nodes.length]; + boolean hasJoinedOncePerNode[] = new boolean[nodes.length]; + ArrayList pings = new ArrayList<>(); + for (int i = 0; i < nodes.length; i++) { + nodes[i] = new DiscoveryNode("" + i, DummyTransportAddress.INSTANCE, Version.CURRENT); + } + + for (int pingCount = scaledRandomIntBetween(10, nodes.length * 10); pingCount > 0; pingCount--) { + int node = randomInt(nodes.length - 1); + DiscoveryNode masterNode = null; + if (randomBoolean()) { + masterNode = nodes[randomInt(nodes.length - 1)]; + } + boolean hasJoinedOnce = randomBoolean(); + ZenPing.PingResponse ping = new ZenPing.PingResponse(nodes[node], masterNode, ClusterName.DEFAULT, hasJoinedOnce); + if (rarely()) { + // ignore some pings + continue; + } + // update max ping info + maxIdPerNode[node] = ping.id(); + masterPerNode[node] = masterNode; + hasJoinedOncePerNode[node] = hasJoinedOnce; + pings.add(ping); + } + + // shuffle + Collections.shuffle(pings); + + ZenPing.PingCollection collection = new ZenPing.PingCollection(); + collection.addPings(pings.toArray(new ZenPing.PingResponse[pings.size()])); + + ZenPing.PingResponse[] aggregate = collection.toArray(); + + for (ZenPing.PingResponse ping : aggregate) { + int nodeId = Integer.parseInt(ping.node().id()); + assertThat(maxIdPerNode[nodeId], equalTo(ping.id())); + assertThat(masterPerNode[nodeId], equalTo(ping.master())); + assertThat(hasJoinedOncePerNode[nodeId], equalTo(ping.hasJoinedOnce())); + + maxIdPerNode[nodeId] = -1; // mark as seen + } + + for (int i = 0; i < maxIdPerNode.length; i++) { + assertTrue("node " + i + " had pings but it was not found in collection", maxIdPerNode[i] <= 0); + } + + + } +}