add reconnection code between nodes that are not masters

This commit is contained in:
kimchy 2010-12-23 17:26:32 +02:00
parent ff5990daec
commit f6afc01fe5
1 changed files with 35 additions and 0 deletions

View File

@ -42,6 +42,7 @@ import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.*; import static java.util.concurrent.Executors.*;
@ -63,6 +64,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final TransportService transportService; private final TransportService transportService;
private final TimeValue reconnectInterval;
private volatile ExecutorService updateTasksExecutor; private volatile ExecutorService updateTasksExecutor;
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>(); private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
@ -71,6 +74,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private volatile ClusterState clusterState = newClusterStateBuilder().build(); private volatile ClusterState clusterState = newClusterStateBuilder().build();
private volatile ScheduledFuture reconnectToNodes;
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService, ThreadPool threadPool, @Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService, ThreadPool threadPool,
TimerService timerService) { TimerService timerService) {
super(settings); super(settings);
@ -79,14 +84,18 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
this.discoveryService = discoveryService; this.discoveryService = discoveryService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.timerService = timerService; this.timerService = timerService;
this.reconnectInterval = componentSettings.getAsTime("reconnect_interval", TimeValue.timeValueSeconds(30));
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
this.clusterState = newClusterStateBuilder().build(); this.clusterState = newClusterStateBuilder().build();
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask")); this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask"));
this.reconnectToNodes = threadPool.scheduleWithFixedDelay(new ReconnectToNodes(), reconnectInterval);
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
this.reconnectToNodes.cancel(true);
for (Tuple<Timeout, NotifyTimeout> onGoingTimeout : onGoingTimeouts) { for (Tuple<Timeout, NotifyTimeout> onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.v1().cancel(); onGoingTimeout.v1().cancel();
onGoingTimeout.v2().listener.onClose(); onGoingTimeout.v2().listener.onClose();
@ -262,4 +271,30 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
// note, we rely on the listener to remove itself in case of timeout if needed // note, we rely on the listener to remove itself in case of timeout if needed
} }
} }
private class ReconnectToNodes implements Runnable {
@Override public void run() {
// master node will check against all nodes if its alive with certain discoveries implementations,
// but we can't rely on that, so we check on it as well
for (DiscoveryNode node : clusterState.nodes()) {
if (lifecycle.stoppedOrClosed()) {
return;
}
if (clusterState.nodes().nodeExists(node.id())) { // we double check existence of node since connectToNode might take time...
if (!transportService.nodeConnected(node)) {
try {
transportService.connectToNode(node);
} catch (Exception e) {
if (lifecycle.stoppedOrClosed()) {
return;
}
if (clusterState.nodes().nodeExists(node.id())) { // double check here as well, maybe its gone?
logger.warn("failed to reconnect to node {}", e, node);
}
}
}
}
}
}
}
} }