diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 0d5d3ab2726..be850319113 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery.zen.ping.unicast; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.Lists; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; @@ -35,18 +36,20 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import java.io.Closeable; import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -83,13 +86,17 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // used as a node id prefix for nodes/address we temporarily connect to private static final String UNICAST_NODE_PREFIX = "#zen_unicast_"; - private final Map receivedResponses = newConcurrentMap(); + private final Map receivedResponses = newConcurrentMap(); // a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes) private final Queue temporalResponses = ConcurrentCollections.newQueue(); private final CopyOnWriteArrayList hostsProviders = new CopyOnWriteArrayList<>(); + private final ExecutorService unicastConnectExecutor; + + private volatile boolean closed = false; + public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { super(settings); @@ -128,6 +135,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]); transportService.registerHandler(ACTION_NAME, new UnicastPingRequestHandler()); + + ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); + unicastConnectExecutor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); } @Override @@ -141,6 +151,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen @Override protected void doClose() throws ElasticsearchException { transportService.removeHandler(ACTION_NAME); + ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); + try { + IOUtils.close(receivedResponses.values()); + } catch (IOException e) { + throw new ElasticsearchException("Error wile closing send ping handlers", e); + } + closed = true; } public void addHostsProvider(UnicastHostsProvider provider) { @@ -184,45 +201,56 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen @Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticsearchException { final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); - receivedResponses.put(sendPingsHandler.id(), new PingCollection()); - sendPings(timeout, null, sendPingsHandler); - threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() { + receivedResponses.put(sendPingsHandler.id(), sendPingsHandler); + try { + sendPings(timeout, null, sendPingsHandler); + } catch (RejectedExecutionException e) { + logger.debug("Ping execution rejected", e); + // The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings + // But don't bail here, we can retry later on after the send ping has been scheduled. + } + threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override - public void run() { - try { - sendPings(timeout, null, sendPingsHandler); - threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() { - @Override - public void run() { - try { - sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler); - PingCollection responses = receivedResponses.remove(sendPingsHandler.id()); - sendPingsHandler.close(); - for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { - logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); - transportService.disconnectFromNode(node); - } - listener.onPing(responses.toArray()); - } catch (EsRejectedExecutionException ex) { - logger.debug("Ping execution rejected", ex); - } + protected void doRun() { + sendPings(timeout, null, sendPingsHandler); + threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler); + sendPingsHandler.close(); + for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { + logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); + transportService.disconnectFromNode(node); } - }); - } catch (EsRejectedExecutionException ex) { - logger.debug("Ping execution rejected", ex); - } + listener.onPing(sendPingsHandler.pingCollection().toArray()); + } + + @Override + public void onFailure(Throwable t) { + logger.debug("Ping execution failed", t); + sendPingsHandler.close(); + } + }); + } + + @Override + public void onFailure(Throwable t) { + logger.debug("Ping execution failed", t); + sendPingsHandler.close(); } }); } - class SendPingsHandler { + class SendPingsHandler implements Closeable { private final int id; - private volatile ExecutorService executor; private final Set nodeToDisconnect = ConcurrentCollections.newConcurrentSet(); - private volatile boolean closed; + private final PingCollection pingCollection; + + private AtomicBoolean closed = new AtomicBoolean(false); SendPingsHandler(int id) { this.id = id; + this.pingCollection = new PingCollection(); } public int id() { @@ -230,21 +258,17 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } public boolean isClosed() { - return this.closed; + return this.closed.get(); } - public Executor executor() { - if (executor == null) { - ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); - executor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); - } - return executor; + public PingCollection pingCollection() { + return pingCollection; } public void close() { - closed = true; - ThreadPool.terminate(executor, 0, TimeUnit.SECONDS); - executor = null; + if (closed.compareAndSet(false, true)) { + receivedResponses.remove(id); + } } } @@ -315,7 +339,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } // fork the connection to another thread final DiscoveryNode finalNodeToSend = nodeToSend; - sendPingsHandler.executor().execute(new Runnable() { + unicastConnectExecutor.execute(new Runnable() { @Override public void run() { if (sendPingsHandler.isClosed()) { @@ -395,11 +419,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), pingResponse.clusterName().value()); continue; } - PingCollection responses = receivedResponses.get(response.id); - if (responses == null) { - logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id); + SendPingsHandler sendPingsHandler = receivedResponses.get(response.id); + if (sendPingsHandler == null) { + if (!closed) { + // Only log when we're not closing the node. Having no send ping handler is then expected + logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id); + } } else { - responses.addPing(pingResponse); + sendPingsHandler.pingCollection().addPing(pingResponse); } } } finally {