add done flag to not continue to connect if we already finished the unicast ping

This commit is contained in:
Shay Banon 2011-08-02 16:42:41 +03:00
parent e44fb27db1
commit bf9d5a0613
1 changed files with 9 additions and 3 deletions

View File

@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -168,13 +169,15 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
} }
@Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException { @Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
final AtomicBoolean done = new AtomicBoolean();
final int id = pingIdGenerator.incrementAndGet(); final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>()); receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
final Set<DiscoveryNode> nodesToDisconnect1 = sendPings(id, timeout, false); final Set<DiscoveryNode> nodesToDisconnect1 = sendPings(id, timeout, false, done);
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() { threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() { @Override public void run() {
final Set<DiscoveryNode> nodesToDisconnect = Sets.newHashSet(nodesToDisconnect1); final Set<DiscoveryNode> nodesToDisconnect = Sets.newHashSet(nodesToDisconnect1);
nodesToDisconnect.addAll(sendPings(id, timeout, true)); nodesToDisconnect.addAll(sendPings(id, timeout, true, done));
done.set(true);
for (DiscoveryNode node : nodesToDisconnect) { for (DiscoveryNode node : nodesToDisconnect) {
transportService.disconnectFromNode(node); transportService.disconnectFromNode(node);
} }
@ -184,7 +187,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}); });
} }
Set<DiscoveryNode> sendPings(final int id, final TimeValue timeout, boolean wait) { Set<DiscoveryNode> sendPings(final int id, final TimeValue timeout, boolean wait, final AtomicBoolean done) {
final UnicastPingRequest pingRequest = new UnicastPingRequest(); final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = id; pingRequest.id = id;
pingRequest.timeout = timeout; pingRequest.timeout = timeout;
@ -217,6 +220,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// fork the connection to another thread // fork the connection to another thread
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override public void run() { @Override public void run() {
if (done.get()) {
return;
}
try { try {
// connect to the node, see if we manage to do it, if not, bail // connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) { if (!nodeFoundByAddress) {