Do not reply to pings from another cluster

Today when sending responses to discovery pings, we unconditionally
reply. Instead, this commit modifies the response handler to not reply
when the cluster names do not match.

This addresses a race condition identified after reducing the timeout in
UnicastZenPingTests#testSimplePings. In particular, we send pings in the
following way:
 - if not connected to the node, connect to the node and after
   successful handshake, send a ping
 - if connected to the node, send a ping

When the ping timeout is set low, a subsequent batch of pings can race
against a connect/disconnect cycle from a prior batch of pings. In
particular, consider the following scenario:
 - node A from cluster X
 - node B from cluster Y
 - pings are initiated from node A with node B in the hosts list
 - node A will try to connect and handshake with B
 - the connection will succeed, and the handshake will eventually fail due to mismatched cluster names
 - on a short timeout, a second batch of pings will fire, and on this
   batch node A will see that it is still connected to node B; thus, it
   will immediately fire a ping to node B and node B will dutifully
   respond

Relates #21894
This commit is contained in:
Jason Tedor 2016-11-30 15:09:42 -05:00 committed by GitHub
parent 103984a4a1
commit c90ba67abb
2 changed files with 6 additions and 2 deletions

View File

@ -584,7 +584,6 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
pingResponses.add(createPingResponse(contextProvider.nodes()));
UnicastPingResponse unicastPingResponse = new UnicastPingResponse();
unicastPingResponse.id = request.id;
unicastPingResponse.pingResponses = pingResponses.toArray(new PingResponse[pingResponses.size()]);
@ -596,8 +595,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
@Override
public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(handlePingRequest(request));
if (request.pingResponse.clusterName().equals(clusterName)) {
channel.sendResponse(handlePingRequest(request));
}
}
}
public static class UnicastPingRequest extends TransportRequest {

View File

@ -334,10 +334,12 @@ public class TransportService extends AbstractLifecycleComponent {
if (node.equals(localNode)) {
return localNode;
}
logger.trace("connecting with node [{}] to perform handshake", node);
transport.connectToNode(node, ConnectionProfile.LIGHT_PROFILE);
try {
return handshake(node, handshakeTimeout, checkClusterName);
} catch (ConnectTransportException | IllegalStateException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("disconnecting from node [{}] after failed handshake", node), e);
transport.disconnectFromNode(node);
throw e;
}