don't establish a connection on demand, only do it when node discovered by the discovery

This commit is contained in:
kimchy 2010-04-18 17:10:09 +03:00
parent 6b7bbfb883
commit 72629fc5ec
17 changed files with 490 additions and 422 deletions

View File

@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.TimeValue.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class TransportClientNodesService extends AbstractComponent implements ClusterStateListener { public class TransportClientNodesService extends AbstractComponent implements ClusterStateListener {
@ -61,7 +61,8 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
private final ThreadPool threadPool; private final ThreadPool threadPool;
private volatile ImmutableList<TransportAddress> transportAddresses = ImmutableList.of(); // nodes that are added to be discovered
private volatile ImmutableList<DiscoveryNode> listedNodes = ImmutableList.of();
private final Object transportMutex = new Object(); private final Object transportMutex = new Object();
@ -97,7 +98,11 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
} }
public ImmutableList<TransportAddress> transportAddresses() { public ImmutableList<TransportAddress> transportAddresses() {
return this.transportAddresses; ImmutableList.Builder<TransportAddress> lstBuilder = ImmutableList.builder();
for (DiscoveryNode listedNode : listedNodes) {
lstBuilder.add(listedNode.address());
}
return lstBuilder.build();
} }
public ImmutableList<DiscoveryNode> connectedNodes() { public ImmutableList<DiscoveryNode> connectedNodes() {
@ -106,8 +111,8 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
public TransportClientNodesService addTransportAddress(TransportAddress transportAddress) { public TransportClientNodesService addTransportAddress(TransportAddress transportAddress) {
synchronized (transportMutex) { synchronized (transportMutex) {
ImmutableList.Builder<TransportAddress> builder = ImmutableList.builder(); ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
transportAddresses = builder.addAll(transportAddresses).add(transportAddress).build(); listedNodes = builder.addAll(listedNodes).add(new DiscoveryNode("#temp#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress)).build();
} }
nodesSampler.run(); nodesSampler.run();
return this; return this;
@ -115,13 +120,13 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) { public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) {
synchronized (transportMutex) { synchronized (transportMutex) {
ImmutableList.Builder<TransportAddress> builder = ImmutableList.builder(); ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
for (TransportAddress otherTransportAddress : transportAddresses) { for (DiscoveryNode otherNode : listedNodes) {
if (!otherTransportAddress.equals(transportAddress)) { if (!otherNode.address().equals(transportAddress)) {
builder.add(otherTransportAddress); builder.add(otherNode);
} }
} }
transportAddresses = builder.build(); listedNodes = builder.build();
} }
nodesSampler.run(); nodesSampler.run();
return this; return this;
@ -146,32 +151,39 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
public void close() { public void close() {
nodesSamplerFuture.cancel(true); nodesSamplerFuture.cancel(true);
for (DiscoveryNode listedNode : listedNodes)
transportService.disconnectFromNode(listedNode);
} }
@Override public void clusterChanged(ClusterChangedEvent event) { @Override public void clusterChanged(ClusterChangedEvent event) {
transportService.nodesAdded(event.nodesDelta().addedNodes()); for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
try {
transportService.connectToNode(node);
} catch (Exception e) {
logger.warn("Failed to connect to discovered node [" + node + "]", e);
}
}
this.discoveredNodes = event.state().nodes(); this.discoveredNodes = event.state().nodes();
HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>(nodes); HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>(nodes);
newNodes.addAll(discoveredNodes.nodes().values()); newNodes.addAll(discoveredNodes.nodes().values());
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build(); nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
transportService.nodesRemoved(event.nodesDelta().removedNodes()); for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
transportService.disconnectFromNode(node);
}
} }
private class ScheduledNodesSampler implements Runnable { private class ScheduledNodesSampler implements Runnable {
@Override public synchronized void run() { @Override public synchronized void run() {
ImmutableList<TransportAddress> transportAddresses = TransportClientNodesService.this.transportAddresses; ImmutableList<DiscoveryNode> listedNodes = TransportClientNodesService.this.listedNodes;
final CountDownLatch latch = new CountDownLatch(transportAddresses.size()); final CountDownLatch latch = new CountDownLatch(listedNodes.size());
final CopyOnWriteArrayList<NodesInfoResponse> nodesInfoResponses = new CopyOnWriteArrayList<NodesInfoResponse>(); final CopyOnWriteArrayList<NodesInfoResponse> nodesInfoResponses = new CopyOnWriteArrayList<NodesInfoResponse>();
final CopyOnWriteArrayList<DiscoveryNode> tempNodes = new CopyOnWriteArrayList<DiscoveryNode>(); for (final DiscoveryNode listedNode : listedNodes) {
for (final TransportAddress transportAddress : transportAddresses) {
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
DiscoveryNode tempNode = new DiscoveryNode("#temp#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress);
tempNodes.add(tempNode);
try { try {
transportService.nodesAdded(ImmutableList.of(tempNode)); transportService.connectToNode(listedNode); // make sure we are connected to it
transportService.sendRequest(tempNode, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfo("_local"), new BaseTransportResponseHandler<NodesInfoResponse>() { transportService.sendRequest(listedNode, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfo("_local"), new BaseTransportResponseHandler<NodesInfoResponse>() {
@Override public NodesInfoResponse newInstance() { @Override public NodesInfoResponse newInstance() {
return new NodesInfoResponse(); return new NodesInfoResponse();
@ -183,12 +195,12 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
} }
@Override public void handleException(RemoteTransportException exp) { @Override public void handleException(RemoteTransportException exp) {
logger.debug("Failed to get node info from " + transportAddress + ", removed from nodes list", exp); logger.debug("Failed to get node info from " + listedNode + ", removed from nodes list", exp);
latch.countDown(); latch.countDown();
} }
}); });
} catch (Exception e) { } catch (Exception e) {
logger.debug("Failed to get node info from " + transportAddress + ", removed from nodes list", e); logger.debug("Failed to get node info from " + listedNode + ", removed from nodes list", e);
latch.countDown(); latch.countDown();
} }
} }
@ -218,9 +230,15 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
if (discoveredNodes != null) { if (discoveredNodes != null) {
newNodes.addAll(discoveredNodes.nodes().values()); newNodes.addAll(discoveredNodes.nodes().values());
} }
// now, make sure we are connected to all the updated nodes
for (DiscoveryNode node : newNodes) {
try {
transportService.connectToNode(node);
} catch (Exception e) {
logger.debug("Failed to connect to discovered node [" + node + "]", e);
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build(); nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
transportService.nodesRemoved(tempNodes);
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.service;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -180,11 +181,15 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
} }
} }
threadPool.execute(new Runnable() { // TODO, do this in parallel (and wait)
@Override public void run() { for (DiscoveryNode node : nodesDelta.addedNodes()) {
transportService.nodesAdded(nodesDelta.addedNodes()); try {
transportService.connectToNode(node);
} catch (Exception e) {
// TODO, need to mark this node as failed...
logger.warn("Failed to connect to node [" + node + "]", e);
} }
}); }
for (TimeoutHolder timeoutHolder : clusterStateTimeoutListeners) { for (TimeoutHolder timeoutHolder : clusterStateTimeoutListeners) {
timeoutHolder.listener.clusterChanged(clusterChangedEvent); timeoutHolder.listener.clusterChanged(clusterChangedEvent);
@ -195,7 +200,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
transportService.nodesRemoved(nodesDelta.removedNodes()); for (DiscoveryNode node : nodesDelta.removedNodes()) {
transportService.disconnectFromNode(node);
}
} }
}); });

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.mapper;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class ParsedDocument { public class ParsedDocument {

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* An exception indicating that a message is sent to a node that is not connected.
*
* @author kimchy (shay.banon)
*/
public class NodeNotConnectedException extends ConnectTransportException {
public NodeNotConnectedException(DiscoveryNode node, String msg) {
super(node, msg);
}
public NodeNotConnectedException(DiscoveryNode node, String msg, Throwable cause) {
super(node, msg, cause);
}
}

View File

@ -70,9 +70,20 @@ public interface Transport extends LifecycleComponent<Transport> {
*/ */
boolean addressSupported(Class<? extends TransportAddress> address); boolean addressSupported(Class<? extends TransportAddress> address);
void nodesAdded(Iterable<DiscoveryNode> nodes); /**
* Returns <tt>true</tt> if the node is connected.
*/
boolean nodeConnected(DiscoveryNode node);
void nodesRemoved(Iterable<DiscoveryNode> nodes); /**
* Connects to the given node, if already connected, does nothing.
*/
void connectToNode(DiscoveryNode node) throws ConnectTransportException;
/**
* Disconnected from the given node, if not connected, will do nothing.
*/
void disconnectFromNode(DiscoveryNode node);
<T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, <T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action,
Streamable message, TransportResponseHandler<T> handler) throws IOException, TransportException; Streamable message, TransportResponseHandler<T> handler) throws IOException, TransportException;

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (shay.banon)
*/
public interface TransportConnectionListener {
void onNodeConnected(DiscoveryNode node);
void onNodeDisconnected(DiscoveryNode node);
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.TransportAddress; import org.elasticsearch.util.transport.TransportAddress;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
@ -51,6 +52,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final AtomicLong requestIds = new AtomicLong(); final AtomicLong requestIds = new AtomicLong();
final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<TransportConnectionListener>();
private boolean throwConnectException = false; private boolean throwConnectException = false;
public TransportService(Transport transport, ThreadPool threadPool) { public TransportService(Transport transport, ThreadPool threadPool) {
@ -73,6 +76,18 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override public TransportResponseHandler remove(long requestId) { @Override public TransportResponseHandler remove(long requestId) {
return clientHandlers.remove(requestId); return clientHandlers.remove(requestId);
} }
@Override public void raiseNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node);
}
}
@Override public void raiseNodeDisconnected(DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
}
}); });
transport.start(); transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) { if (transport.boundAddress() != null && logger.isInfoEnabled()) {
@ -96,20 +111,24 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return transport.boundAddress(); return transport.boundAddress();
} }
public void nodesAdded(Iterable<DiscoveryNode> nodes) { public boolean nodeConnected(DiscoveryNode node) {
try { return transport.nodeConnected(node);
transport.nodesAdded(nodes);
} catch (Exception e) {
logger.warn("Failed add nodes [" + nodes + "] to transport", e);
}
} }
public void nodesRemoved(Iterable<DiscoveryNode> nodes) { public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
try { transport.connectToNode(node);
transport.nodesRemoved(nodes); }
} catch (Exception e) {
logger.warn("Failed to remove nodes[" + nodes + "] from transport", e); public void disconnectFromNode(DiscoveryNode node) {
} transport.disconnectFromNode(node);
}
public void addConnectionListener(TransportConnectionListener listener) {
connectionListeners.add(listener);
}
public void removeConnectionListener(TransportConnectionListener listener) {
connectionListeners.remove(listener);
} }
/** /**

View File

@ -19,12 +19,18 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public interface TransportServiceAdapter { public interface TransportServiceAdapter {
TransportRequestHandler handler(String action); TransportRequestHandler handler(String action);
TransportResponseHandler remove(long requestId); TransportResponseHandler remove(long requestId);
void raiseNodeConnected(DiscoveryNode node);
void raiseNodeDisconnected(DiscoveryNode node);
} }

View File

@ -58,6 +58,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
public LocalTransport(ThreadPool threadPool) { public LocalTransport(ThreadPool threadPool) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool); this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool);
} }
@ -92,10 +94,31 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
return boundAddress; return boundAddress;
} }
@Override public void nodesAdded(Iterable<DiscoveryNode> nodes) { @Override public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.containsKey(node);
} }
@Override public void nodesRemoved(Iterable<DiscoveryNode> nodes) { @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
synchronized (this) {
if (connectedNodes.containsKey(node)) {
return;
}
final LocalTransport targetTransport = transports.get(node.address());
if (targetTransport == null) {
throw new ConnectTransportException(node, "Failed to connect");
}
connectedNodes.put(node, targetTransport);
transportServiceAdapter.raiseNodeConnected(node);
}
}
@Override public void disconnectFromNode(DiscoveryNode node) {
synchronized (this) {
LocalTransport removed = connectedNodes.remove(node);
if (removed != null) {
transportServiceAdapter.raiseNodeDisconnected(node);
}
}
} }
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action,
@ -110,9 +133,9 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
stream.writeUTF(action); stream.writeUTF(action);
message.writeTo(stream); message.writeTo(stream);
final LocalTransport targetTransport = transports.get(node.address()); final LocalTransport targetTransport = connectedNodes.get(node);
if (targetTransport == null) { if (targetTransport == null) {
throw new ConnectTransportException(node, "Failed to connect"); throw new NodeNotConnectedException(node, "Node not connected");
} }
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();

View File

@ -116,7 +116,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile ServerBootstrap serverBootstrap; private volatile ServerBootstrap serverBootstrap;
// node id to actual channel // node id to actual channel
final ConcurrentMap<String, NodeConnections> clientChannels = newConcurrentMap(); final ConcurrentMap<String, NodeConnections> connectedNodes = newConcurrentMap();
private volatile Channel serverChannel; private volatile Channel serverChannel;
@ -297,7 +297,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
serverBootstrap = null; serverBootstrap = null;
} }
for (Iterator<NodeConnections> it = clientChannels.values().iterator(); it.hasNext();) { for (Iterator<NodeConnections> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeConnections nodeConnections = it.next(); NodeConnections nodeConnections = it.next();
it.remove(); it.remove();
nodeConnections.close(); nodeConnections.close();
@ -309,7 +309,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
ScheduledFuture<?> scheduledFuture = threadPool.schedule(new Runnable() { ScheduledFuture<?> scheduledFuture = threadPool.schedule(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
for (Iterator<NodeConnections> it = clientChannels.values().iterator(); it.hasNext();) { for (Iterator<NodeConnections> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeConnections nodeConnections = it.next(); NodeConnections nodeConnections = it.next();
it.remove(); it.remove();
nodeConnections.close(); nodeConnections.close();
@ -391,106 +391,115 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// }); // });
} }
@Override public void nodesAdded(Iterable<DiscoveryNode> nodes) { @Override public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.containsKey(node.id());
}
@Override public void connectToNode(DiscoveryNode node) {
if (!lifecycle.started()) { if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport"); throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport");
} }
for (DiscoveryNode node : nodes) { try {
try { if (node == null) {
nodeChannel(node); throw new ConnectTransportException(node, "Can't connect to a null node");
} catch (Exception e) {
logger.warn("Failed to connect to discovered node [" + node + "]", e);
} }
NodeConnections nodeConnections = connectedNodes.get(node.id());
if (nodeConnections != null) {
return;
}
synchronized (this) {
// recheck here, within the sync block (we cache connections, so we don't care about this single sync block)
nodeConnections = connectedNodes.get(node.id());
if (nodeConnections != null) {
return;
}
// build connection(s) to the node
ArrayList<Channel> channels = new ArrayList<Channel>();
Throwable lastConnectException = null;
for (int connectionIndex = 0; connectionIndex < connectionsPerNode; connectionIndex++) {
for (int i = 1; i <= connectRetries; i++) {
if (!lifecycle.started()) {
for (Channel channel1 : channels) {
channel1.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
}
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture channelFuture = clientBootstrap.connect(address);
channelFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!channelFuture.isSuccess()) {
// we failed to connect, check if we need to bail or retry
if (i == connectRetries && connectionIndex == 0) {
lastConnectException = channelFuture.getCause();
if (connectionIndex == 0) {
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException);
} else {
// break out of the retry loop, try another connection
break;
}
} else {
logger.trace("Retry #[" + i + "], connect to [" + node + "]");
try {
channelFuture.getChannel().close();
} catch (Exception e) {
// ignore
}
continue;
}
}
// we got a connection, add it to our connections
Channel channel = channelFuture.getChannel();
if (!lifecycle.started()) {
channel.close();
for (Channel channel1 : channels) {
channel1.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
}
channel.getCloseFuture().addListener(new ChannelCloseListener(node.id()));
channels.add(channel);
break;
}
}
if (channels.isEmpty()) {
if (lastConnectException != null) {
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException);
}
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "], reason unknown");
}
if (logger.isDebugEnabled()) {
logger.debug("Connected to node[{}], number_of_connections[{}]", node, channels.size());
}
connectedNodes.put(node.id(), new NodeConnections(node, channels.toArray(new Channel[channels.size()])));
transportServiceAdapter.raiseNodeConnected(node);
}
} catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e);
} }
} }
@Override public void nodesRemoved(Iterable<DiscoveryNode> nodes) { @Override public void disconnectFromNode(DiscoveryNode node) {
for (DiscoveryNode node : nodes) { NodeConnections nodeConnections = connectedNodes.remove(node.id());
NodeConnections nodeConnections = clientChannels.remove(node.id()); if (nodeConnections != null) {
if (nodeConnections != null) { nodeConnections.close();
nodeConnections.close();
}
} }
} }
private Channel nodeChannel(DiscoveryNode node) throws ConnectTransportException { private Channel nodeChannel(DiscoveryNode node) throws ConnectTransportException {
if (node == null) { NettyTransport.NodeConnections nodeConnections = connectedNodes.get(node.id());
throw new ConnectTransportException(node, "Can't connect to a null node"); if (nodeConnections == null) {
throw new NodeNotConnectedException(node, "Node not connected");
} }
NodeConnections nodeConnections = clientChannels.get(node.id()); Channel channel = nodeConnections.channel();
if (nodeConnections != null) { if (channel == null) {
return nodeConnections.channel(); throw new NodeNotConnectedException(node, "Node not connected");
} }
synchronized (this) { return channel;
// recheck here, within the sync block (we cache connections, so we don't care about this single sync block)
nodeConnections = clientChannels.get(node.id());
if (nodeConnections != null) {
return nodeConnections.channel();
}
// build connection(s) to the node
ArrayList<Channel> channels = new ArrayList<Channel>();
Throwable lastConnectException = null;
for (int connectionIndex = 0; connectionIndex < connectionsPerNode; connectionIndex++) {
for (int i = 1; i <= connectRetries; i++) {
if (!lifecycle.started()) {
for (Channel channel1 : channels) {
channel1.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
}
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture channelFuture = clientBootstrap.connect(address);
channelFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!channelFuture.isSuccess()) {
// we failed to connect, check if we need to bail or retry
if (i == connectRetries && connectionIndex == 0) {
lastConnectException = channelFuture.getCause();
if (connectionIndex == 0) {
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException);
} else {
// break out of the retry loop, try another connection
break;
}
} else {
logger.trace("Retry #[" + i + "], connect to [" + node + "]");
try {
channelFuture.getChannel().close();
} catch (Exception e) {
// ignore
}
continue;
}
}
// we got a connection, add it to our connections
Channel channel = channelFuture.getChannel();
if (!lifecycle.started()) {
channel.close();
for (Channel channel1 : channels) {
channel1.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
}
channel.getCloseFuture().addListener(new ChannelCloseListener(node.id()));
channels.add(channel);
break;
}
}
if (channels.isEmpty()) {
if (lastConnectException != null) {
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException);
}
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "], reason unknown");
}
if (logger.isDebugEnabled()) {
logger.debug("Connected to node[{}], number_of_connections[{}]", node, channels.size());
}
clientChannels.put(node.id(), new NodeConnections(channels.toArray(new Channel[channels.size()])));
}
return clientChannels.get(node.id()).channel();
} }
private static class NodeConnections { public class NodeConnections {
private final DiscoveryNode node;
private final AtomicInteger counter = new AtomicInteger(); private final AtomicInteger counter = new AtomicInteger();
@ -498,7 +507,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile boolean closed = false; private volatile boolean closed = false;
private NodeConnections(Channel[] channels) { private NodeConnections(DiscoveryNode node, Channel[] channels) {
this.node = node;
this.channels = channels; this.channels = channels;
} }
@ -532,6 +542,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
channel.close().awaitUninterruptibly(); channel.close().awaitUninterruptibly();
} }
} }
transportServiceAdapter.raiseNodeDisconnected(node);
} }
} }
@ -544,13 +555,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} }
@Override public void operationComplete(ChannelFuture future) throws Exception { @Override public void operationComplete(ChannelFuture future) throws Exception {
final NodeConnections nodeConnections = clientChannels.get(nodeId); final NodeConnections nodeConnections = connectedNodes.get(nodeId);
if (nodeConnections != null) { if (nodeConnections != null) {
nodeConnections.channelClosed(future.getChannel()); nodeConnections.channelClosed(future.getChannel());
if (nodeConnections.numberOfChannels() == 0) { if (nodeConnections.numberOfChannels() == 0) {
// all the channels in the node connections are closed, remove it from // all the channels in the node connections are closed, remove it from
// our client channels // our client channels
clientChannels.remove(nodeId); connectedNodes.remove(nodeId);
// and close it
nodeConnections.close();
} }
} }
} }

View File

@ -37,7 +37,7 @@ public class NettyTransportManagement {
@ManagedAttribute(description = "Number of connections this node has to other nodes") @ManagedAttribute(description = "Number of connections this node has to other nodes")
public long getNumberOfOutboundConnections() { public long getNumberOfOutboundConnections() {
return transport.clientChannels.size(); return transport.connectedNodes.size();
} }
@ManagedAttribute(description = "Number if IO worker threads") @ManagedAttribute(description = "Number if IO worker threads")

View File

@ -71,12 +71,8 @@ public class InetSocketTransportAddress implements TransportAddress {
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
InetSocketTransportAddress address1 = (InetSocketTransportAddress) o; InetSocketTransportAddress address1 = (InetSocketTransportAddress) o;
return address.equals(address1.address);
if (address != null ? !address.equals(address1.address) : address1.address != null) return false;
return true;
} }
@Override @Override

View File

@ -0,0 +1,168 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public abstract class AbstractSimpleTransportTests {
protected ThreadPool threadPool;
protected TransportService serviceA;
protected TransportService serviceB;
protected DiscoveryNode serviceANode;
protected DiscoveryNode serviceBNode;
@BeforeMethod public void setUp() {
threadPool = new ScalingThreadPool();
build();
serviceA.connectToNode(serviceBNode);
serviceB.connectToNode(serviceANode);
}
@AfterMethod public void tearDown() {
serviceA.close();
serviceB.close();
threadPool.shutdown();
}
protected abstract void build();
@Test public void testHelloWorld() {
serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
}
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello moshe", equalTo(response.message));
}
@Override public void handleException(RemoteTransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessage message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
System.out.println("after ...");
}
@Test public void testErrorMessage() {
serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloException",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override public void handleException(RemoteTransportException exp) {
assertThat("bad message !!!", equalTo(exp.getCause().getMessage()));
}
});
try {
res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat("bad message !!!", equalTo(e.getCause().getMessage()));
}
System.out.println("after ...");
}
private class StringMessage implements Streamable {
private String message;
private StringMessage(String message) {
this.message = message;
}
private StringMessage() {
}
@Override public void readFrom(StreamInput in) throws IOException {
message = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(message);
}
}
}

View File

@ -20,149 +20,18 @@
package org.elasticsearch.transport.local; package org.elasticsearch.transport.local;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTests;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.IOException; @Test
public class SimpleLocalTransportTests extends AbstractSimpleTransportTests {
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
public class SimpleLocalTransportTests {
private ThreadPool threadPool;
private TransportService serviceA;
private TransportService serviceB;
private DiscoveryNode serviceANode;
private DiscoveryNode serviceBNode;
@BeforeClass public void setUp() {
threadPool = new ScalingThreadPool();
@Override protected void build() {
serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress()); serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start(); serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress()); serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
} }
@AfterClass public void tearDown() {
serviceA.close();
serviceB.close();
threadPool.shutdown();
}
@Test public void testHelloWorld() {
serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
}
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello moshe", equalTo(response.message));
}
@Override public void handleException(RemoteTransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessage message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
System.out.println("after ...");
}
@Test public void testErrorMessage() {
serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloException",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override public void handleException(RemoteTransportException exp) {
assertThat("bad message !!!", equalTo(exp.getCause().getMessage()));
}
});
try {
res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat("bad message !!!", equalTo(e.getCause().getMessage()));
}
System.out.println("after ...");
}
private class StringMessage implements Streamable {
private String message;
private StringMessage(String message) {
this.message = message;
}
private StringMessage() {
}
@Override public void readFrom(StreamInput in) throws IOException {
message = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(message);
}
}
} }

View File

@ -20,149 +20,18 @@
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTests;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.IOException; @Test
public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
public class SimpleNettyTransportTests {
private ThreadPool threadPool;
private TransportService serviceA;
private TransportService serviceB;
private DiscoveryNode serviceANode;
private DiscoveryNode serviceBNode;
@BeforeClass public void setUp() {
threadPool = new ScalingThreadPool();
@Override protected void build() {
serviceA = new TransportService(new NettyTransport(threadPool), threadPool).start(); serviceA = new TransportService(new NettyTransport(threadPool), threadPool).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress()); serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new NettyTransport(threadPool), threadPool).start(); serviceB = new TransportService(new NettyTransport(threadPool), threadPool).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress()); serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
} }
@AfterClass public void tearDown() {
serviceA.close();
serviceB.close();
threadPool.shutdown();
}
@Test public void testHelloWorld() {
serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
}
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello moshe", equalTo(response.message));
}
@Override public void handleException(RemoteTransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessage message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
System.out.println("after ...");
}
@Test public void testErrorMessage() {
serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloException",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override public void handleException(RemoteTransportException exp) {
assertThat("bad message !!!", equalTo(exp.getCause().getMessage()));
}
});
try {
res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat("bad message !!!", equalTo(e.getCause().getMessage()));
}
System.out.println("after ...");
}
private class StringMessage implements Streamable {
private String message;
private StringMessage(String message) {
this.message = message;
}
private StringMessage() {
}
@Override public void readFrom(StreamInput in) throws IOException {
message = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(message);
}
}
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport.netty.benchmark; package org.elasticsearch.transport.netty.benchmark;
import com.google.common.collect.Lists;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool; import org.elasticsearch.threadpool.cached.CachedThreadPool;
@ -62,7 +61,7 @@ public class BenchmarkNettyClient {
final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999)); final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999));
transportService.nodesAdded(Lists.newArrayList(node)); transportService.connectToNode(node);
Thread[] clients = new Thread[NUMBER_OF_CLIENTS]; Thread[] clients = new Thread[NUMBER_OF_CLIENTS];

View File

@ -41,9 +41,9 @@ public class TransportClientDocumentActionsTests extends DocumentActionsTests {
} }
@Override protected Client getClient2() { @Override protected Client getClient2() {
TransportAddress server1Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder().put("discovery.enabled", false).build()); TransportClient client = new TransportClient(settingsBuilder().put("discovery.enabled", false).build());
client.addTransportAddress(server1Address); client.addTransportAddress(server2Address);
return client; return client;
} }
} }