Transport Client: Improve remote node freeze handling by adding another timeout layer, closes #1653.
This commit is contained in:
parent
9194d36a64
commit
eb4f6709d9
|
@ -55,6 +55,8 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
|
||||
private final TimeValue nodesSamplerInterval;
|
||||
|
||||
private final long pingTimeout;
|
||||
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private final TransportService transportService;
|
||||
|
@ -87,6 +89,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
this.threadPool = threadPool;
|
||||
|
||||
this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5));
|
||||
this.pingTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(5)).millis();
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("node_sampler_interval[" + nodesSamplerInterval + "]");
|
||||
|
@ -242,9 +245,13 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
class ScheduledNodeSampler implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
nodesSampler.sample();
|
||||
if (!closed) {
|
||||
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
|
||||
try {
|
||||
nodesSampler.sample();
|
||||
if (!closed) {
|
||||
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to sample", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -262,24 +269,28 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
try {
|
||||
transportService.connectToNode(node);
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to connect to node " + node + ", removed from nodes list", e);
|
||||
logger.debug("failed to connect to node [{}], removed from nodes list", e, node);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
try {
|
||||
NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME, Requests.nodesInfoRequest("_local"), new FutureTransportResponseHandler<NodesInfoResponse>() {
|
||||
@Override
|
||||
public NodesInfoResponse newInstance() {
|
||||
return new NodesInfoResponse();
|
||||
}
|
||||
}).txGet();
|
||||
NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME,
|
||||
Requests.nodesInfoRequest("_local"),
|
||||
TransportRequestOptions.options().withTimeout(pingTimeout),
|
||||
new FutureTransportResponseHandler<NodesInfoResponse>() {
|
||||
@Override
|
||||
public NodesInfoResponse newInstance() {
|
||||
return new NodesInfoResponse();
|
||||
}
|
||||
}).txGet();
|
||||
if (!clusterName.equals(nodeInfo.clusterName())) {
|
||||
logger.warn("Node {} not part of the cluster {}, ignoring...", node, clusterName);
|
||||
logger.warn("node {} not part of the cluster {}, ignoring...", node, clusterName);
|
||||
} else {
|
||||
newNodes.add(node);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to get node info for {}", e, node);
|
||||
logger.info("failed to get node info for {}, disconnecting...", e, node);
|
||||
transportService.disconnectFromNode(node);
|
||||
}
|
||||
}
|
||||
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
|
||||
|
@ -311,33 +322,45 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
transportService.connectToNode(listedNode); // make sure we are connected to it
|
||||
transportService.sendRequest(listedNode, NodesInfoAction.NAME, Requests.nodesInfoRequest("_all"), new BaseTransportResponseHandler<NodesInfoResponse>() {
|
||||
|
||||
@Override
|
||||
public NodesInfoResponse newInstance() {
|
||||
return new NodesInfoResponse();
|
||||
if (!transportService.nodeConnected(listedNode)) {
|
||||
try {
|
||||
transportService.connectToNode(listedNode);
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode);
|
||||
return;
|
||||
}
|
||||
}
|
||||
transportService.sendRequest(listedNode, NodesInfoAction.NAME,
|
||||
Requests.nodesInfoRequest("_all"),
|
||||
TransportRequestOptions.options().withTimeout(pingTimeout),
|
||||
new BaseTransportResponseHandler<NodesInfoResponse>() {
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
@Override
|
||||
public NodesInfoResponse newInstance() {
|
||||
return new NodesInfoResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(NodesInfoResponse response) {
|
||||
nodesInfoResponses.add(response);
|
||||
latch.countDown();
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.debug("Failed to get node info from " + listedNode + ", removed from nodes list", exp);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleResponse(NodesInfoResponse response) {
|
||||
nodesInfoResponses.add(response);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException e) {
|
||||
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
|
||||
transportService.disconnectFromNode(listedNode);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to get node info from " + listedNode + ", removed from nodes list", e);
|
||||
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
|
||||
transportService.disconnectFromNode(listedNode);
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
@ -354,7 +377,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
for (NodesInfoResponse nodesInfoResponse : nodesInfoResponses) {
|
||||
for (NodeInfo nodeInfo : nodesInfoResponse) {
|
||||
if (!clusterName.equals(nodesInfoResponse.clusterName())) {
|
||||
logger.warn("Node {} not part of the cluster {}, ignoring...", nodeInfo.node(), clusterName);
|
||||
logger.warn("node {} not part of the cluster {}, ignoring...", nodeInfo.node(), clusterName);
|
||||
} else {
|
||||
if (nodeInfo.node().dataNode()) { // only add data nodes to connect to
|
||||
newNodes.add(nodeInfo.node());
|
||||
|
@ -369,7 +392,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
transportService.connectToNode(node);
|
||||
} catch (Exception e) {
|
||||
it.remove();
|
||||
logger.debug("Failed to connect to discovered node [" + node + "]", e);
|
||||
logger.debug("failed to connect to discovered node [" + node + "]", e);
|
||||
}
|
||||
}
|
||||
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
|
||||
|
|
Loading…
Reference in New Issue