When a node disconnects from the cluster (not enough master nodes, or a client node) and rejoins it might not update its internal routing table, closes #

This commit is contained in:
Shay Banon 2012-04-30 00:57:52 +03:00
parent 5c6d8314c0
commit deff094343
2 changed files with 14 additions and 4 deletions

View File

@ -610,7 +610,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
.build(); .build();
// clear the routing table, we have no master, so we need to recreate the routing when we reform the cluster // clear the routing table, we have no master, so we need to recreate the routing when we reform the cluster
RoutingTable routingTable = RoutingTable.builder().version(clusterState.routingTable().version()).build(); RoutingTable routingTable = RoutingTable.builder().build();
// we also clean the metadata, since we are going to recover it if we become master // we also clean the metadata, since we are going to recover it if we become master
MetaData metaData = MetaData.builder().build(); MetaData metaData = MetaData.builder().build();

View File

@ -185,6 +185,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id()); ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node); transportService.disconnectFromNode(node);
} }
sendPingsHandler.close(); sendPingsHandler.close();
@ -268,12 +269,21 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
try { try {
// connect to the node, see if we manage to do it, if not, bail // connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) { if (!nodeFoundByAddress) {
logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), nodeToSend);
transportService.connectToNodeLight(nodeToSend); transportService.connectToNodeLight(nodeToSend);
} else { } else {
logger.trace("[{}] connecting to {}", sendPingsHandler.id(), nodeToSend);
transportService.connectToNode(nodeToSend); transportService.connectToNode(nodeToSend);
} }
// we are connected, send the ping request logger.trace("[{}] connected to {}", sendPingsHandler.id(), node);
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); if (receivedResponses.containsKey(sendPingsHandler.id())) {
// we are connected and still in progress, send the ping request
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
} else {
// connect took too long, just log it and bail
latch.countDown();
logger.trace("[{}] connect to {} was too long outside of ping window, bailing", sendPingsHandler.id(), node);
}
} catch (ConnectTransportException e) { } catch (ConnectTransportException e) {
// can't connect to the node // can't connect to the node
logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), nodeToSend); logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), nodeToSend);
@ -295,7 +305,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
} }
private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
logger.trace("[{}] connecting to {}", id, nodeToSend); logger.trace("[{}] sending to {}", id, nodeToSend);
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() { transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
@Override @Override