diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f8eb8069cb3..48167e8faf9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -96,6 +96,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private volatile DiscoveryNodes latestDiscoNodes; + private volatile Thread currentJoinThread; + private final AtomicBoolean initialStateSent = new AtomicBoolean(); @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, @@ -136,11 +138,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen pingService.start(); // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered - threadPool.cached().execute(new Runnable() { - @Override public void run() { - joinCluster(); - } - }); + asyncJoinCluster(); } @Override protected void doStop() throws ElasticSearchException { @@ -170,6 +168,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } master = false; + if (currentJoinThread != null) { + try { + currentJoinThread.interrupt(); + } catch (Exception e) { + // ignore + } + } } @Override protected void doClose() throws ElasticSearchException { @@ -214,7 +219,20 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen publishClusterState.publish(clusterState); } - private void joinCluster() { + private void asyncJoinCluster() { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + currentJoinThread = Thread.currentThread(); + try { + innterJoinCluster(); + } finally { + currentJoinThread = null; + } + } + }); + } + + private void innterJoinCluster() { boolean retry = true; while (retry) { if (lifecycle.stoppedOrClosed()) { @@ -371,12 +389,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen routingTable = RoutingTable.newRoutingTableBuilder().build(); } masterFD.stop("no master elected since master left (reason = " + reason + ")"); - // try and join the cluster again... - threadPool.cached().execute(new Runnable() { - @Override public void run() { - joinCluster(); - } - }); + asyncJoinCluster(); } latestDiscoNodes = builder.build(); return newClusterStateBuilder().state(currentState)