Catch RejectedExceutionException in ZenPing et al.

If nodes are shutting down we close thread pools and throw
'EsRejectedExcutionException'. This commit handles these exceptions
gracefully if throw during Ping execution.
This commit is contained in:
Simon Willnauer 2013-09-28 20:57:58 +02:00
parent 6b6a468327
commit 1f906804ff
3 changed files with 26 additions and 14 deletions

View File

@ -48,7 +48,9 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
void onPing(PingResponse[] pings); void onPing(PingResponse[] pings);
} }
public class PingResponse implements Streamable { public static class PingResponse implements Streamable {
public static PingResponse[] EMPTY = new PingResponse[0];
private ClusterName clusterName; private ClusterName clusterName;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
@ -144,7 +145,12 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
ImmutableList<? extends ZenPing> zenPings = this.zenPings; ImmutableList<? extends ZenPing> zenPings = this.zenPings;
CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings); CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings);
for (ZenPing zenPing : zenPings) { for (ZenPing zenPing : zenPings) {
try {
zenPing.ping(compoundPingListener, timeout); zenPing.ping(compoundPingListener, timeout);
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
compoundPingListener.onPing(null);
}
} }
} }
@ -152,15 +158,12 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
private final PingListener listener; private final PingListener listener;
private final ImmutableList<? extends ZenPing> zenPings;
private final AtomicInteger counter; private final AtomicInteger counter;
private ConcurrentMap<DiscoveryNode, PingResponse> responses = ConcurrentCollections.newConcurrentMap(); private ConcurrentMap<DiscoveryNode, PingResponse> responses = ConcurrentCollections.newConcurrentMap();
private CompoundPingListener(PingListener listener, ImmutableList<? extends ZenPing> zenPings) { private CompoundPingListener(PingListener listener, ImmutableList<? extends ZenPing> zenPings) {
this.listener = listener; this.listener = listener;
this.zenPings = zenPings;
this.counter = new AtomicInteger(zenPings.size()); this.counter = new AtomicInteger(zenPings.size());
} }

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
@ -199,6 +200,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
public PingResponse[] pingAndWait(TimeValue timeout) { public PingResponse[] pingAndWait(TimeValue timeout) {
final AtomicReference<PingResponse[]> response = new AtomicReference<PingResponse[]>(); final AtomicReference<PingResponse[]> response = new AtomicReference<PingResponse[]>();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
try {
ping(new PingListener() { ping(new PingListener() {
@Override @Override
public void onPing(PingResponse[] pings) { public void onPing(PingResponse[] pings) {
@ -206,11 +208,16 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
latch.countDown(); latch.countDown();
} }
}, timeout); }, timeout);
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
return PingResponse.EMPTY;
}
try { try {
latch.await(); latch.await();
return response.get(); return response.get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
return null; Thread.currentThread().interrupt();
return PingResponse.EMPTY;
} }
} }
@ -220,7 +227,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
threadPool.generic().execute(new Runnable() { threadPool.generic().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
listener.onPing(new PingResponse[0]); listener.onPing(PingResponse.EMPTY);
} }
}); });
return; return;