Transport Client: Adding more nodes causes more scheduled reconnect tasks, closes #1062.

This commit is contained in:
kimchy 2011-06-24 21:33:24 +03:00
parent e373bf09f2
commit 7acdda74f4
2 changed files with 30 additions and 38 deletions

View File

@ -24,24 +24,6 @@
<option name="LOCALE" />
<option name="OPEN_IN_BROWSER" value="true" />
</component>
<component name="NullableNotNullManager">
<option name="myDefaultNullable" value="org.jetbrains.annotations.Nullable" />
<option name="myDefaultNotNull" value="org.jetbrains.annotations.NotNull" />
<option name="myNullables">
<value>
<list size="0" />
</value>
</option>
<option name="myNotNulls">
<value>
<list size="3">
<item index="0" class="java.lang.String" itemvalue="org.jetbrains.annotations.NotNull" />
<item index="1" class="java.lang.String" itemvalue="javax.annotation.Nonnull" />
<item index="2" class="java.lang.String" itemvalue="edu.umd.cs.findbugs.annotations.NonNull" />
</list>
</value>
</option>
</component>
<component name="ProjectDetails">
<option name="projectName" value="elasticsearch" />
</component>

View File

@ -33,7 +33,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Iterator;
@ -66,7 +70,7 @@ public class TransportClientNodesService extends AbstractComponent {
private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
private final Runnable nodesSampler;
private final NodeSampler nodesSampler;
private volatile ScheduledFuture nodesSamplerFuture;
@ -88,11 +92,11 @@ public class TransportClientNodesService extends AbstractComponent {
}
if (componentSettings.getAsBoolean("sniff", false)) {
this.nodesSampler = new ScheduledSniffNodesSampler();
this.nodesSampler = new SniffNodesSampler();
} else {
this.nodesSampler = new ScheduledConnectNodeSampler();
this.nodesSampler = new SimpleNodeSampler();
}
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, nodesSampler);
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, new ScheduledNodeSampler());
// we want the transport service to throw connect exceptions, so we can retry
transportService.throwConnectException(true);
@ -115,7 +119,7 @@ public class TransportClientNodesService extends AbstractComponent {
ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
listedNodes = builder.addAll(listedNodes).add(new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress)).build();
}
nodesSampler.run();
nodesSampler.sample();
return this;
}
@ -129,7 +133,7 @@ public class TransportClientNodesService extends AbstractComponent {
}
listedNodes = builder.build();
}
nodesSampler.run();
nodesSampler.sample();
return this;
}
@ -157,8 +161,22 @@ public class TransportClientNodesService extends AbstractComponent {
transportService.disconnectFromNode(listedNode);
}
private class ScheduledConnectNodeSampler implements Runnable {
@Override public synchronized void run() {
interface NodeSampler {
void sample();
}
class ScheduledNodeSampler implements Runnable {
@Override public void run() {
nodesSampler.sample();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}
class SimpleNodeSampler implements NodeSampler {
@Override public synchronized void sample() {
if (closed) {
return;
}
@ -188,16 +206,12 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}
private class ScheduledSniffNodesSampler implements Runnable {
class SniffNodesSampler implements NodeSampler {
@Override public synchronized void run() {
@Override public synchronized void sample() {
if (closed) {
return;
}
@ -256,7 +270,7 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
// now, make sure we are connected to all the updated nodes
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext();) {
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
try {
transportService.connectToNode(node);
@ -266,10 +280,6 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
}
}
}