From 1f906804ffee9ca910467fa091154e6945703649 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 28 Sep 2013 20:57:58 +0200 Subject: [PATCH] Catch RejectedExceutionException in ZenPing et al. If nodes are shutting down we close thread pools and throw 'EsRejectedExcutionException'. This commit handles these exceptions gracefully if throw during Ping execution. --- .../discovery/zen/ping/ZenPing.java | 4 ++- .../discovery/zen/ping/ZenPingService.java | 11 +++++--- .../zen/ping/multicast/MulticastZenPing.java | 25 ++++++++++++------- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java index 1172dbb3f37..d65321051e6 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java @@ -48,7 +48,9 @@ public interface ZenPing extends LifecycleComponent { void onPing(PingResponse[] pings); } - public class PingResponse implements Streamable { + public static class PingResponse implements Streamable { + + public static PingResponse[] EMPTY = new PingResponse[0]; private ClusterName clusterName; diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index 6a0343188d0..079f12ed36f 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; @@ -144,7 +145,12 @@ public class ZenPingService extends AbstractLifecycleComponent implemen ImmutableList zenPings = this.zenPings; CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings); for (ZenPing zenPing : zenPings) { - zenPing.ping(compoundPingListener, timeout); + try { + zenPing.ping(compoundPingListener, timeout); + } catch (EsRejectedExecutionException ex) { + logger.debug("Ping execution rejected", ex); + compoundPingListener.onPing(null); + } } } @@ -152,15 +158,12 @@ public class ZenPingService extends AbstractLifecycleComponent implemen private final PingListener listener; - private final ImmutableList zenPings; - private final AtomicInteger counter; private ConcurrentMap responses = ConcurrentCollections.newConcurrentMap(); private CompoundPingListener(PingListener listener, ImmutableList zenPings) { this.listener = listener; - this.zenPings = zenPings; this.counter = new AtomicInteger(zenPings.size()); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 8f9acb18ed5..c6cf52a979f 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -199,18 +200,24 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem public PingResponse[] pingAndWait(TimeValue timeout) { final AtomicReference response = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - ping(new PingListener() { - @Override - public void onPing(PingResponse[] pings) { - response.set(pings); - latch.countDown(); - } - }, timeout); + try { + ping(new PingListener() { + @Override + public void onPing(PingResponse[] pings) { + response.set(pings); + latch.countDown(); + } + }, timeout); + } catch (EsRejectedExecutionException ex) { + logger.debug("Ping execution rejected", ex); + return PingResponse.EMPTY; + } try { latch.await(); return response.get(); } catch (InterruptedException e) { - return null; + Thread.currentThread().interrupt(); + return PingResponse.EMPTY; } } @@ -220,7 +227,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem threadPool.generic().execute(new Runnable() { @Override public void run() { - listener.onPing(new PingResponse[0]); + listener.onPing(PingResponse.EMPTY); } }); return;