From 42e85674778ad1f26695f6c632a0a9fab6f1174c Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 23 Dec 2010 16:11:43 +0200 Subject: [PATCH] interrupt joining the cluster thread if shutting down --- .../discovery/zen/ZenDiscovery.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f8eb8069cb3..48167e8faf9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -96,6 +96,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private volatile DiscoveryNodes latestDiscoNodes; + private volatile Thread currentJoinThread; + private final AtomicBoolean initialStateSent = new AtomicBoolean(); @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, @@ -136,11 +138,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen pingService.start(); // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered - threadPool.cached().execute(new Runnable() { - @Override public void run() { - joinCluster(); - } - }); + asyncJoinCluster(); } @Override protected void doStop() throws ElasticSearchException { @@ -170,6 +168,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } master = false; + if (currentJoinThread != null) { + try { + currentJoinThread.interrupt(); + } catch (Exception e) { + // ignore + } + } } @Override protected void doClose() throws ElasticSearchException { @@ -214,7 +219,20 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen publishClusterState.publish(clusterState); } - private void joinCluster() { + private void asyncJoinCluster() { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + currentJoinThread = Thread.currentThread(); + try { + innterJoinCluster(); + } finally { + currentJoinThread = null; + } + } + }); + } + + private void innterJoinCluster() { boolean retry = true; while (retry) { if (lifecycle.stoppedOrClosed()) { @@ -371,12 +389,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen routingTable = RoutingTable.newRoutingTableBuilder().build(); } masterFD.stop("no master elected since master left (reason = " + reason + ")"); - // try and join the cluster again... - threadPool.cached().execute(new Runnable() { - @Override public void run() { - joinCluster(); - } - }); + asyncJoinCluster(); } latestDiscoNodes = builder.build(); return newClusterStateBuilder().state(currentState)