Discovery: UnicastZenPing - use temporary node ids if can't resolve node by it's address

The Unicast Zen Ping mechanism is configured to ping certain host:port combinations in order to discover other node. Since this is only a ping, we do not setup a full connection but rather do a light connect with one channel. This light connection is closed at the end of the pinging.

During pinging, we may discover disco nodes which are not yet connected (via temporalResponses). UnicastZenPing will setup the same light connection for those node. However, during pinging a cluster state may arrive with those nodes in it. In that case , we will mistakenly believe those nodes are connected and at the end of pinging we will mistakenly disconnect those valid node.

This commit makes sure that all nodes UnicastZenPing connects to have a unique id and can be safely disconnected.

Closes #7719
This commit is contained in:
Boaz Leskes 2014-09-14 22:47:17 +02:00
parent 7ca64237a8
commit 2250f58757
1 changed files with 30 additions and 18 deletions

View File

@ -77,6 +77,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final AtomicInteger pingIdGenerator = new AtomicInteger(); private final AtomicInteger pingIdGenerator = new AtomicInteger();
// used to generate unique ids for nodes/address we temporarily connect to
private final AtomicInteger unicastNodeIdGenerator = new AtomicInteger();
// used as a node id prefix for nodes/address we temporarily connect to
private static final String UNICAST_NODE_PREFIX = "#zen_unicast_";
private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap(); private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes) // a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
@ -108,13 +114,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects); logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
List<DiscoveryNode> configuredTargetNodes = Lists.newArrayList(); List<DiscoveryNode> configuredTargetNodes = Lists.newArrayList();
int idCounter = 0;
for (String host : hosts) { for (String host : hosts) {
try { try {
TransportAddress[] addresses = transportService.addressesFromString(host); TransportAddress[] addresses = transportService.addressesFromString(host);
// we only limit to 1 addresses, makes no sense to ping 100 ports // we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) { for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) {
configuredTargetNodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i], version.minimumCompatibilityVersion())); configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", addresses[i], version.minimumCompatibilityVersion()));
} }
} catch (Exception e) { } catch (Exception e) {
throw new ElasticsearchIllegalArgumentException("Failed to resolve address for [" + host + "]", e); throw new ElasticsearchIllegalArgumentException("Failed to resolve address for [" + host + "]", e);
@ -281,28 +286,35 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) { for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected // make sure we are connected
boolean nodeFoundByAddressX; final boolean nodeFoundByAddress;
DiscoveryNode nodeToSendX = discoNodes.findByAddress(node.address()); DiscoveryNode nodeToSend = discoNodes.findByAddress(node.address());
if (nodeToSendX != null) { if (nodeToSend != null) {
nodeFoundByAddressX = true; nodeFoundByAddress = true;
} else { } else {
nodeToSendX = node; nodeToSend = node;
nodeFoundByAddressX = false; nodeFoundByAddress = false;
} }
final DiscoveryNode nodeToSend = nodeToSendX;
final boolean nodeFoundByAddress = nodeFoundByAddressX;
if (!transportService.nodeConnected(nodeToSend)) { if (!transportService.nodeConnected(nodeToSend)) {
if (sendPingsHandler.isClosed()) { if (sendPingsHandler.isClosed()) {
return; return;
} }
// only disconnect from nodes that we will end up creating a light connection to, as they are temporal
// if we find on the disco nodes a matching node by address, we are going to restore the connection // if we find on the disco nodes a matching node by address, we are going to restore the connection
// anyhow down the line if its not connected... // anyhow down the line if its not connected...
// if we can't resolve the node, we don't know and we have to clean up after pinging. We do have
// to make sure we don't disconnect a true node which was temporarily removed from the DiscoveryNodes
// but will be added again during the pinging. We therefore create a new temporary node
if (!nodeFoundByAddress) { if (!nodeFoundByAddress) {
DiscoveryNode tempNode = new DiscoveryNode("",
UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "_" + nodeToSend.id(),
nodeToSend.getHostName(), nodeToSend.getHostAddress(), nodeToSend.address(), nodeToSend.attributes(), nodeToSend.version()
);
logger.trace("replacing {} with temp node {}", nodeToSend, tempNode);
nodeToSend = tempNode;
sendPingsHandler.nodeToDisconnect.add(nodeToSend); sendPingsHandler.nodeToDisconnect.add(nodeToSend);
} }
// fork the connection to another thread // fork the connection to another thread
final DiscoveryNode finalNodeToSend = nodeToSend;
sendPingsHandler.executor().execute(new Runnable() { sendPingsHandler.executor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -313,16 +325,16 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
try { try {
// connect to the node, see if we manage to do it, if not, bail // connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) { if (!nodeFoundByAddress) {
logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), nodeToSend); logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNodeLight(nodeToSend); transportService.connectToNodeLight(finalNodeToSend);
} else { } else {
logger.trace("[{}] connecting to {}", sendPingsHandler.id(), nodeToSend); logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNode(nodeToSend); transportService.connectToNode(finalNodeToSend);
} }
logger.trace("[{}] connected to {}", sendPingsHandler.id(), node); logger.trace("[{}] connected to {}", sendPingsHandler.id(), node);
if (receivedResponses.containsKey(sendPingsHandler.id())) { if (receivedResponses.containsKey(sendPingsHandler.id())) {
// we are connected and still in progress, send the ping request // we are connected and still in progress, send the ping request
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
} else { } else {
// connect took too long, just log it and bail // connect took too long, just log it and bail
latch.countDown(); latch.countDown();
@ -331,9 +343,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
success = true; success = true;
} catch (ConnectTransportException e) { } catch (ConnectTransportException e) {
// can't connect to the node - this is a more common path! // can't connect to the node - this is a more common path!
logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), nodeToSend); logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), finalNodeToSend);
} catch (Throwable e) { } catch (Throwable e) {
logger.warn("[{}] failed send ping to {}", e, sendPingsHandler.id(), nodeToSend); logger.warn("[{}] failed send ping to {}", e, sendPingsHandler.id(), finalNodeToSend);
} finally { } finally {
if (!success) { if (!success) {
latch.countDown(); latch.countDown();