If a node is being shutdown some in flight ping request may be executed. Make sure to keep track of those ping requests and close the unicast connect executor service.

Closes #7903
This commit is contained in:
Martijn van Groningen 2014-09-26 22:31:37 +02:00
parent de0cca4cef
commit 71adb3ada2
1 changed files with 72 additions and 45 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.discovery.zen.ping.unicast;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
@ -35,18 +36,20 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -83,13 +86,17 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// used as a node id prefix for nodes/address we temporarily connect to
private static final String UNICAST_NODE_PREFIX = "#zen_unicast_";
private final Map<Integer, PingCollection> receivedResponses = newConcurrentMap();
private final Map<Integer, SendPingsHandler> receivedResponses = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();
private final CopyOnWriteArrayList<UnicastHostsProvider> hostsProviders = new CopyOnWriteArrayList<>();
private final ExecutorService unicastConnectExecutor;
private volatile boolean closed = false;
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
Version version, ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
@ -128,6 +135,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]);
transportService.registerHandler(ACTION_NAME, new UnicastPingRequestHandler());
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastConnectExecutor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
}
@Override
@ -141,6 +151,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override
protected void doClose() throws ElasticsearchException {
transportService.removeHandler(ACTION_NAME);
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
try {
IOUtils.close(receivedResponses.values());
} catch (IOException e) {
throw new ElasticsearchException("Error wile closing send ping handlers", e);
}
closed = true;
}
public void addHostsProvider(UnicastHostsProvider provider) {
@ -184,45 +201,56 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override
public void ping(final PingListener listener, final TimeValue timeout) throws ElasticsearchException {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), new PingCollection());
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(timeout, null, sendPingsHandler);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void run() {
try {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
try {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
PingCollection responses = receivedResponses.remove(sendPingsHandler.id());
sendPingsHandler.close();
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
listener.onPing(responses.toArray());
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
}
protected void doRun() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
sendPingsHandler.close();
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
}
listener.onPing(sendPingsHandler.pingCollection().toArray());
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
}
class SendPingsHandler {
class SendPingsHandler implements Closeable {
private final int id;
private volatile ExecutorService executor;
private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet();
private volatile boolean closed;
private final PingCollection pingCollection;
private AtomicBoolean closed = new AtomicBoolean(false);
SendPingsHandler(int id) {
this.id = id;
this.pingCollection = new PingCollection();
}
public int id() {
@ -230,21 +258,17 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
public boolean isClosed() {
return this.closed;
return this.closed.get();
}
public Executor executor() {
if (executor == null) {
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
executor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
}
return executor;
public PingCollection pingCollection() {
return pingCollection;
}
public void close() {
closed = true;
ThreadPool.terminate(executor, 0, TimeUnit.SECONDS);
executor = null;
if (closed.compareAndSet(false, true)) {
receivedResponses.remove(id);
}
}
}
@ -315,7 +339,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
// fork the connection to another thread
final DiscoveryNode finalNodeToSend = nodeToSend;
sendPingsHandler.executor().execute(new Runnable() {
unicastConnectExecutor.execute(new Runnable() {
@Override
public void run() {
if (sendPingsHandler.isClosed()) {
@ -395,11 +419,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), pingResponse.clusterName().value());
continue;
}
PingCollection responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id);
SendPingsHandler sendPingsHandler = receivedResponses.get(response.id);
if (sendPingsHandler == null) {
if (!closed) {
// Only log when we're not closing the node. Having no send ping handler is then expected
logger.warn("received ping response {} with no matching handler id [{}]", pingResponse, response.id);
}
} else {
responses.addPing(pingResponse);
sendPingsHandler.pingCollection().addPing(pingResponse);
}
}
} finally {