improve unicast discovery to use less resources by using dedicated thread pool capped with concurrent connects (defaults to 10)

This commit is contained in:
Shay Banon 2011-08-04 14:33:06 +03:00
parent cbb95dee17
commit 1908639749
1 changed files with 64 additions and 28 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -33,6 +32,9 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
@ -55,8 +57,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -79,6 +83,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final ClusterName clusterName;
private final int concurrentConnects;
private final DiscoveryNode[] nodes;
@ -103,13 +108,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
this.transportService = transportService;
this.clusterName = clusterName;
this.concurrentConnects = componentSettings.getAsInt("concurrent_connects", 10);
String[] hostArr = componentSettings.getAsArray("hosts");
// trim the hosts
for (int i = 0; i < hostArr.length; i++) {
hostArr[i] = hostArr[i].trim();
}
List<String> hosts = Lists.newArrayList(hostArr);
logger.debug("using initial hosts {}", hosts);
logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
List<DiscoveryNode> nodes = Lists.newArrayList();
int idCounter = 0;
@ -169,27 +175,61 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
@Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
final AtomicBoolean done = new AtomicBoolean();
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
final Set<DiscoveryNode> nodesToDisconnect1 = sendPings(id, timeout, false, done);
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPings(timeout, false, sendPingsHandler);
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
final Set<DiscoveryNode> nodesToDisconnect = Sets.newHashSet(nodesToDisconnect1);
nodesToDisconnect.addAll(sendPings(id, timeout, true, done));
done.set(true);
for (DiscoveryNode node : nodesToDisconnect) {
sendPings(timeout, true, sendPingsHandler);
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
transportService.disconnectFromNode(node);
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
sendPingsHandler.close();
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
});
}
Set<DiscoveryNode> sendPings(final int id, final TimeValue timeout, boolean wait, final AtomicBoolean done) {
class SendPingsHandler {
private final int id;
private volatile ExecutorService executor;
private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet();
private volatile boolean closed;
SendPingsHandler(int id) {
this.id = id;
}
public int id() {
return this.id;
}
public boolean isClosed() {
return this.closed;
}
public Executor executor() {
if (executor == null) {
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
executor = DynamicExecutors.newScalingThreadPool(1, concurrentConnects, 60000, threadFactory);
}
return executor;
}
public void close() {
closed = true;
if (executor != null) {
executor.shutdownNow();
executor = null;
}
nodeToDisconnect.clear();
}
}
void sendPings(final TimeValue timeout, boolean wait, final SendPingsHandler sendPingsHandler) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = id;
pingRequest.id = sendPingsHandler.id();
pingRequest.timeout = timeout;
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);
@ -199,30 +239,28 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
nodesToPing.addAll(provider.buildDynamicNodes());
}
Set<DiscoveryNode> nodesToDisconnect = Sets.newHashSet();
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected
boolean nodeFoundByAddressX;
DiscoveryNode nodeToSendX = discoNodes.findByAddress(node.address());
if (nodeToSendX != null) {
nodeFoundByAddressX = false;
nodeFoundByAddressX = true;
} else {
nodeToSendX = node;
nodeFoundByAddressX = true;
nodeFoundByAddressX = false;
}
final DiscoveryNode nodeToSend = nodeToSendX;
final boolean nodeFoundByAddress = nodeFoundByAddressX;
if (!transportService.nodeConnected(nodeToSend)) {
nodesToDisconnect.add(nodeToSend);
if (sendPingsHandler.isClosed()) {
return;
}
sendPingsHandler.nodeToDisconnect.add(nodeToSend);
// fork the connection to another thread
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
sendPingsHandler.executor().execute(new Runnable() {
@Override public void run() {
if (done.get()) {
return;
}
try {
// connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) {
@ -231,16 +269,16 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
transportService.connectToNode(nodeToSend);
}
// we are connected, send the ping request
sendPingRequestToNode(id, timeout, pingRequest, latch, node, nodeToSend);
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
} catch (ConnectTransportException e) {
// can't connect to the node
logger.trace("[{}] failed to connect to {}", e, id, nodeToSend);
logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), nodeToSend);
latch.countDown();
}
}
});
} else {
sendPingRequestToNode(id, timeout, pingRequest, latch, node, nodeToSend);
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
}
}
if (wait) {
@ -250,8 +288,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// ignore
}
}
return nodesToDisconnect;
}
private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {