interrupt joining the cluster thread if shutting down

This commit is contained in:
kimchy 2010-12-23 16:11:43 +02:00
parent 5f25ae4f2f
commit 42e8567477
1 changed files with 25 additions and 12 deletions

View File

@ -96,6 +96,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private volatile DiscoveryNodes latestDiscoNodes; private volatile DiscoveryNodes latestDiscoNodes;
private volatile Thread currentJoinThread;
private final AtomicBoolean initialStateSent = new AtomicBoolean(); private final AtomicBoolean initialStateSent = new AtomicBoolean();
@Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
@ -136,11 +138,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
pingService.start(); pingService.start();
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
threadPool.cached().execute(new Runnable() { asyncJoinCluster();
@Override public void run() {
joinCluster();
}
});
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
@ -170,6 +168,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} }
} }
master = false; master = false;
if (currentJoinThread != null) {
try {
currentJoinThread.interrupt();
} catch (Exception e) {
// ignore
}
}
} }
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
@ -214,7 +219,20 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
publishClusterState.publish(clusterState); publishClusterState.publish(clusterState);
} }
private void joinCluster() { private void asyncJoinCluster() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
currentJoinThread = Thread.currentThread();
try {
innterJoinCluster();
} finally {
currentJoinThread = null;
}
}
});
}
private void innterJoinCluster() {
boolean retry = true; boolean retry = true;
while (retry) { while (retry) {
if (lifecycle.stoppedOrClosed()) { if (lifecycle.stoppedOrClosed()) {
@ -371,12 +389,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
routingTable = RoutingTable.newRoutingTableBuilder().build(); routingTable = RoutingTable.newRoutingTableBuilder().build();
} }
masterFD.stop("no master elected since master left (reason = " + reason + ")"); masterFD.stop("no master elected since master left (reason = " + reason + ")");
// try and join the cluster again... asyncJoinCluster();
threadPool.cached().execute(new Runnable() {
@Override public void run() {
joinCluster();
}
});
} }
latestDiscoNodes = builder.build(); latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState) return newClusterStateBuilder().state(currentState)