Transport Client: When adding an address was already added, ignore it, closes #1906.

This commit is contained in:
Shay Banon 2012-05-03 16:11:15 +03:00
parent fe70b51080
commit 8db27cc5bc
4 changed files with 95 additions and 42 deletions

View File

@ -184,6 +184,13 @@ public class TransportClient extends AbstractClient {
return nodesService.connectedNodes(); return nodesService.connectedNodes();
} }
/**
* Returns the listed nodes in the transport client (ones added to it).
*/
public ImmutableList<DiscoveryNode> listedNodes() {
return nodesService.listedNodes();
}
/** /**
* Adds a transport address that will be used to connect to. * Adds a transport address that will be used to connect to.
* <p/> * <p/>
@ -192,7 +199,7 @@ public class TransportClient extends AbstractClient {
* <p/> * <p/>
* <p>In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}. * <p>In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
*/ */
public TransportClient addTransportAddress(TransportAddress transportAddress) { public TransportClient addTransportAddress(TransportAddress... transportAddress) {
nodesService.addTransportAddress(transportAddress); nodesService.addTransportAddress(transportAddress);
return this; return this;
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.transport; package org.elasticsearch.client.transport;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import jsr166y.LinkedTransferQueue; import jsr166y.LinkedTransferQueue;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
@ -42,6 +43,7 @@ import org.elasticsearch.transport.*;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -119,12 +121,37 @@ public class TransportClientNodesService extends AbstractComponent {
return this.nodes; return this.nodes;
} }
public TransportClientNodesService addTransportAddress(TransportAddress transportAddress) { public ImmutableList<DiscoveryNode> listedNodes() {
return this.listedNodes;
}
public TransportClientNodesService addTransportAddress(TransportAddress... transportAddresses) {
synchronized (transportMutex) { synchronized (transportMutex) {
List<TransportAddress> filtered = Lists.newArrayListWithExpectedSize(transportAddresses.length);
for (TransportAddress transportAddress : transportAddresses) {
boolean found = false;
for (DiscoveryNode otherNode : listedNodes) {
if (otherNode.address().equals(transportAddress)) {
found = true;
logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
break;
}
}
if (!found) {
filtered.add(transportAddress);
}
}
if (filtered.isEmpty()) {
return this;
}
ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder(); ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress); builder.addAll(listedNodes());
logger.debug("adding address {}", node); for (TransportAddress transportAddress : filtered) {
listedNodes = builder.addAll(listedNodes).add(node).build(); DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress);
logger.debug("adding address [{}]", node);
builder.add(node);
}
listedNodes = builder.build();
} }
nodesSampler.sample(); nodesSampler.sample();
return this; return this;
@ -137,7 +164,7 @@ public class TransportClientNodesService extends AbstractComponent {
if (!otherNode.address().equals(transportAddress)) { if (!otherNode.address().equals(transportAddress)) {
builder.add(otherNode); builder.add(otherNode);
} else { } else {
logger.debug("removing address {}", otherNode); logger.debug("removing address [{}]", otherNode);
} }
} }
listedNodes = builder.build(); listedNodes = builder.build();

View File

@ -142,7 +142,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} }
if (cumulationBuffer) { if (cumulationBuffer) {
assert buffer == this.cumulation;
if (!buffer.readable()) { if (!buffer.readable()) {
this.cumulation = null; this.cumulation = null;
} else if (buffer.readerIndex() > 0) { } else if (buffer.readerIndex() > 0) {

View File

@ -139,6 +139,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile BoundTransportAddress boundAddress; private volatile BoundTransportAddress boundAddress;
private final Object[] connectMutex;
public NettyTransport(ThreadPool threadPool) { public NettyTransport(ThreadPool threadPool) {
this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS)); this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS));
} }
@ -153,6 +155,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.threadPool = threadPool; this.threadPool = threadPool;
this.networkService = networkService; this.networkService = networkService;
this.connectMutex = new Object[500];
for (int i = 0; i < connectMutex.length; i++) {
connectMutex[i] = new Object();
}
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors() * 2); this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors() * 2);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false))); this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false))); this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
@ -489,39 +496,41 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (node == null) { if (node == null) {
throw new ConnectTransportException(null, "Can't connect to a null node"); throw new ConnectTransportException(null, "Can't connect to a null node");
} }
try { synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node); try {
if (nodeChannels != null) { NodeChannels nodeChannels = connectedNodes.get(node);
return; if (nodeChannels != null) {
} return;
}
if (light) { if (light) {
nodeChannels = connectToChannelsLight(node); nodeChannels = connectToChannelsLight(node);
} else { } else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]); nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
try { try {
connectToChannels(nodeChannels, node); connectToChannels(nodeChannels, node);
} catch (Exception e) { } catch (Exception e) {
nodeChannels.close();
throw e;
}
}
NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels);
if (existing != null) {
// we are already connected to a node, close this ones
nodeChannels.close(); nodeChannels.close();
throw e; } else {
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
} }
}
NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels); } catch (ConnectTransportException e) {
if (existing != null) { throw e;
// we are already connected to a node, close this ones } catch (Exception e) {
nodeChannels.close(); throw new ConnectTransportException(node, "General node connection failure", e);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
} }
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e);
} }
} }
@ -620,13 +629,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override @Override
public void disconnectFromNode(DiscoveryNode node) { public void disconnectFromNode(DiscoveryNode node) {
NodeChannels nodeChannels = connectedNodes.remove(node); synchronized (connectLock(node.id())) {
if (nodeChannels != null) { NodeChannels nodeChannels = connectedNodes.remove(node);
try { if (nodeChannels != null) {
nodeChannels.close(); try {
} finally { nodeChannels.close();
logger.debug("Disconnected from [{}]", node); } finally {
transportServiceAdapter.raiseNodeDisconnected(node); logger.debug("disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} }
} }
} }
@ -639,6 +650,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return nodeChannels.channel(options.type()); return nodeChannels.channel(options.type());
} }
private Object connectLock(String nodeId) {
int hash = nodeId.hashCode();
// abs returns Integer.MIN_VALUE, so we need to protect against it...
if (hash == Integer.MIN_VALUE) {
hash = 0;
}
return connectMutex[Math.abs(hash) % connectMutex.length];
}
private class ChannelCloseListener implements ChannelFutureListener { private class ChannelCloseListener implements ChannelFutureListener {
private final DiscoveryNode node; private final DiscoveryNode node;