close transport client to wait for ongoing samples

the transport client should not be executing at the same time as sampling happens, so connection open/close logic will be properly maintained
This commit is contained in:
Shay Banon 2013-11-09 14:54:50 +01:00
parent fb7a234040
commit ffe5e1861f

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -69,7 +70,7 @@ public class TransportClientNodesService extends AbstractComponent {
// nodes that are added to be discovered
private volatile ImmutableList<DiscoveryNode> listedNodes = ImmutableList.of();
private final Object transportMutex = new Object();
private final Object mutex = new Object();
private volatile ImmutableList<DiscoveryNode> nodes = ImmutableList.of();
@ -129,7 +130,10 @@ public class TransportClientNodesService extends AbstractComponent {
}
public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
synchronized (transportMutex) {
synchronized (mutex) {
if (closed) {
throw new ElasticSearchIllegalStateException("transport client is closed, can't add an address");
}
List<TransportAddress> filtered = Lists.newArrayListWithExpectedSize(transportAddresses.length);
for (TransportAddress transportAddress : transportAddresses) {
boolean found = false;
@ -155,13 +159,16 @@ public class TransportClientNodesService extends AbstractComponent {
builder.add(node);
}
listedNodes = builder.build();
nodesSampler.sample();
}
nodesSampler.sample();
return this;
}
public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) {
synchronized (transportMutex) {
synchronized (mutex) {
if (closed) {
throw new ElasticSearchIllegalStateException("transport client is closed, can't remove an address");
}
ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
for (DiscoveryNode otherNode : listedNodes) {
if (!otherNode.address().equals(transportAddress)) {
@ -171,8 +178,8 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
listedNodes = builder.build();
nodesSampler.sample();
}
nodesSampler.sample();
return this;
}
@ -262,19 +269,33 @@ public class TransportClientNodesService extends AbstractComponent {
}
public void close() {
closed = true;
nodesSamplerFuture.cancel(true);
for (DiscoveryNode node : nodes) {
transportService.disconnectFromNode(node);
synchronized (mutex) {
if (closed) {
return;
}
closed = true;
nodesSamplerFuture.cancel(true);
for (DiscoveryNode node : nodes) {
transportService.disconnectFromNode(node);
}
for (DiscoveryNode listedNode : listedNodes) {
transportService.disconnectFromNode(listedNode);
}
nodes = ImmutableList.of();
}
for (DiscoveryNode listedNode : listedNodes) {
transportService.disconnectFromNode(listedNode);
}
nodes = ImmutableList.of();
}
interface NodeSampler {
void sample();
abstract class NodeSampler {
public void sample() {
synchronized (mutex) {
if (closed) {
return;
}
doSample();
}
}
protected abstract void doSample();
}
class ScheduledNodeSampler implements Runnable {
@ -291,13 +312,10 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
class SimpleNodeSampler implements NodeSampler {
class SimpleNodeSampler extends NodeSampler {
@Override
public synchronized void sample() {
if (closed) {
return;
}
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>();
for (DiscoveryNode node : listedNodes) {
if (!transportService.nodeConnected(node)) {
@ -332,14 +350,10 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
class SniffNodesSampler implements NodeSampler {
class SniffNodesSampler extends NodeSampler {
@Override
public synchronized void sample() {
if (closed) {
return;
}
protected void doSample() {
// the nodes we are going to ping include the core listed nodes that were added
// and the last round of discovered nodes
Set<DiscoveryNode> nodesToPing = Sets.newHashSet();