From 72629fc5ec60edd6f8159a2f02bbc14c41ae579d Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 18 Apr 2010 17:10:09 +0300 Subject: [PATCH] don't establish a connection on demand, only do it when node discovered by the discovery --- .../TransportClientNodesService.java | 66 +++--- .../service/InternalClusterService.java | 17 +- .../index/mapper/ParsedDocument.java | 2 +- .../transport/NodeNotConnectedException.java | 38 ++++ .../elasticsearch/transport/Transport.java | 15 +- .../TransportConnectionListener.java | 32 +++ .../transport/TransportService.java | 43 ++-- .../transport/TransportServiceAdapter.java | 8 +- .../transport/local/LocalTransport.java | 31 ++- .../transport/netty/NettyTransport.java | 195 ++++++++++-------- .../netty/NettyTransportManagement.java | 2 +- .../transport/InetSocketTransportAddress.java | 6 +- .../AbstractSimpleTransportTests.java | 168 +++++++++++++++ .../local/SimpleLocalTransportTests.java | 141 +------------ .../netty/SimpleNettyTransportTests.java | 141 +------------ .../netty/benchmark/BenchmarkNettyClient.java | 3 +- .../TransportClientDocumentActionsTests.java | 4 +- 17 files changed, 490 insertions(+), 422 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeNotConnectedException.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index de32ec5ca6c..b8b61a00e63 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.util.TimeValue.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class TransportClientNodesService extends AbstractComponent implements ClusterStateListener { @@ -61,7 +61,8 @@ public class TransportClientNodesService extends AbstractComponent implements Cl private final ThreadPool threadPool; - private volatile ImmutableList transportAddresses = ImmutableList.of(); + // nodes that are added to be discovered + private volatile ImmutableList listedNodes = ImmutableList.of(); private final Object transportMutex = new Object(); @@ -97,7 +98,11 @@ public class TransportClientNodesService extends AbstractComponent implements Cl } public ImmutableList transportAddresses() { - return this.transportAddresses; + ImmutableList.Builder lstBuilder = ImmutableList.builder(); + for (DiscoveryNode listedNode : listedNodes) { + lstBuilder.add(listedNode.address()); + } + return lstBuilder.build(); } public ImmutableList connectedNodes() { @@ -106,8 +111,8 @@ public class TransportClientNodesService extends AbstractComponent implements Cl public TransportClientNodesService addTransportAddress(TransportAddress transportAddress) { synchronized (transportMutex) { - ImmutableList.Builder builder = ImmutableList.builder(); - transportAddresses = builder.addAll(transportAddresses).add(transportAddress).build(); + ImmutableList.Builder builder = ImmutableList.builder(); + listedNodes = builder.addAll(listedNodes).add(new DiscoveryNode("#temp#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress)).build(); } nodesSampler.run(); return this; @@ -115,13 +120,13 @@ public class TransportClientNodesService extends AbstractComponent implements Cl public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) { synchronized (transportMutex) { - ImmutableList.Builder builder = ImmutableList.builder(); - for (TransportAddress otherTransportAddress : transportAddresses) { - if (!otherTransportAddress.equals(transportAddress)) { - builder.add(otherTransportAddress); + ImmutableList.Builder builder = ImmutableList.builder(); + for (DiscoveryNode otherNode : listedNodes) { + if (!otherNode.address().equals(transportAddress)) { + builder.add(otherNode); } } - transportAddresses = builder.build(); + listedNodes = builder.build(); } nodesSampler.run(); return this; @@ -146,32 +151,39 @@ public class TransportClientNodesService extends AbstractComponent implements Cl public void close() { nodesSamplerFuture.cancel(true); + for (DiscoveryNode listedNode : listedNodes) + transportService.disconnectFromNode(listedNode); } @Override public void clusterChanged(ClusterChangedEvent event) { - transportService.nodesAdded(event.nodesDelta().addedNodes()); + for (DiscoveryNode node : event.nodesDelta().addedNodes()) { + try { + transportService.connectToNode(node); + } catch (Exception e) { + logger.warn("Failed to connect to discovered node [" + node + "]", e); + } + } this.discoveredNodes = event.state().nodes(); HashSet newNodes = new HashSet(nodes); newNodes.addAll(discoveredNodes.nodes().values()); nodes = new ImmutableList.Builder().addAll(newNodes).build(); - transportService.nodesRemoved(event.nodesDelta().removedNodes()); + for (DiscoveryNode node : event.nodesDelta().removedNodes()) { + transportService.disconnectFromNode(node); + } } private class ScheduledNodesSampler implements Runnable { @Override public synchronized void run() { - ImmutableList transportAddresses = TransportClientNodesService.this.transportAddresses; - final CountDownLatch latch = new CountDownLatch(transportAddresses.size()); + ImmutableList listedNodes = TransportClientNodesService.this.listedNodes; + final CountDownLatch latch = new CountDownLatch(listedNodes.size()); final CopyOnWriteArrayList nodesInfoResponses = new CopyOnWriteArrayList(); - final CopyOnWriteArrayList tempNodes = new CopyOnWriteArrayList(); - for (final TransportAddress transportAddress : transportAddresses) { + for (final DiscoveryNode listedNode : listedNodes) { threadPool.execute(new Runnable() { @Override public void run() { - DiscoveryNode tempNode = new DiscoveryNode("#temp#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress); - tempNodes.add(tempNode); try { - transportService.nodesAdded(ImmutableList.of(tempNode)); - transportService.sendRequest(tempNode, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfo("_local"), new BaseTransportResponseHandler() { + transportService.connectToNode(listedNode); // make sure we are connected to it + transportService.sendRequest(listedNode, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfo("_local"), new BaseTransportResponseHandler() { @Override public NodesInfoResponse newInstance() { return new NodesInfoResponse(); @@ -183,12 +195,12 @@ public class TransportClientNodesService extends AbstractComponent implements Cl } @Override public void handleException(RemoteTransportException exp) { - logger.debug("Failed to get node info from " + transportAddress + ", removed from nodes list", exp); + logger.debug("Failed to get node info from " + listedNode + ", removed from nodes list", exp); latch.countDown(); } }); } catch (Exception e) { - logger.debug("Failed to get node info from " + transportAddress + ", removed from nodes list", e); + logger.debug("Failed to get node info from " + listedNode + ", removed from nodes list", e); latch.countDown(); } } @@ -218,9 +230,15 @@ public class TransportClientNodesService extends AbstractComponent implements Cl if (discoveredNodes != null) { newNodes.addAll(discoveredNodes.nodes().values()); } + // now, make sure we are connected to all the updated nodes + for (DiscoveryNode node : newNodes) { + try { + transportService.connectToNode(node); + } catch (Exception e) { + logger.debug("Failed to connect to discovered node [" + node + "]", e); + } + } nodes = new ImmutableList.Builder().addAll(newNodes).build(); - - transportService.nodesRemoved(tempNodes); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 5fb356c7a39..3efbc68f92f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.service; import com.google.inject.Inject; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; @@ -180,11 +181,15 @@ public class InternalClusterService extends AbstractLifecycleComponent { */ boolean addressSupported(Class address); - void nodesAdded(Iterable nodes); + /** + * Returns true if the node is connected. + */ + boolean nodeConnected(DiscoveryNode node); - void nodesRemoved(Iterable nodes); + /** + * Connects to the given node, if already connected, does nothing. + */ + void connectToNode(DiscoveryNode node) throws ConnectTransportException; + + /** + * Disconnected from the given node, if not connected, will do nothing. + */ + void disconnectFromNode(DiscoveryNode node); void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, TransportResponseHandler handler) throws IOException, TransportException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java new file mode 100644 index 00000000000..371457b0e0a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; + +/** + * @author kimchy (shay.banon) + */ +public interface TransportConnectionListener { + + void onNodeConnected(DiscoveryNode node); + + void onNodeDisconnected(DiscoveryNode node); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index eff6a252af6..cc517d0c8df 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -31,6 +31,7 @@ import org.elasticsearch.util.transport.BoundTransportAddress; import org.elasticsearch.util.transport.TransportAddress; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; @@ -51,6 +52,8 @@ public class TransportService extends AbstractLifecycleComponent connectionListeners = new CopyOnWriteArrayList(); + private boolean throwConnectException = false; public TransportService(Transport transport, ThreadPool threadPool) { @@ -73,6 +76,18 @@ public class TransportService extends AbstractLifecycleComponent nodes) { - try { - transport.nodesAdded(nodes); - } catch (Exception e) { - logger.warn("Failed add nodes [" + nodes + "] to transport", e); - } + public boolean nodeConnected(DiscoveryNode node) { + return transport.nodeConnected(node); } - public void nodesRemoved(Iterable nodes) { - try { - transport.nodesRemoved(nodes); - } catch (Exception e) { - logger.warn("Failed to remove nodes[" + nodes + "] from transport", e); - } + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + transport.connectToNode(node); + } + + public void disconnectFromNode(DiscoveryNode node) { + transport.disconnectFromNode(node); + } + + public void addConnectionListener(TransportConnectionListener listener) { + connectionListeners.add(listener); + } + + public void removeConnectionListener(TransportConnectionListener listener) { + connectionListeners.remove(listener); } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java index 63810ccdc74..9dee61a84c4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java @@ -19,12 +19,18 @@ package org.elasticsearch.transport; +import org.elasticsearch.cluster.node.DiscoveryNode; + /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public interface TransportServiceAdapter { TransportRequestHandler handler(String action); TransportResponseHandler remove(long requestId); + + void raiseNodeConnected(DiscoveryNode node); + + void raiseNodeDisconnected(DiscoveryNode node); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 31a9e41c1d2..484d2025c78 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -58,6 +58,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); + private final ConcurrentMap connectedNodes = newConcurrentMap(); + public LocalTransport(ThreadPool threadPool) { this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool); } @@ -92,10 +94,31 @@ public class LocalTransport extends AbstractLifecycleComponent implem return boundAddress; } - @Override public void nodesAdded(Iterable nodes) { + @Override public boolean nodeConnected(DiscoveryNode node) { + return connectedNodes.containsKey(node); } - @Override public void nodesRemoved(Iterable nodes) { + @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + synchronized (this) { + if (connectedNodes.containsKey(node)) { + return; + } + final LocalTransport targetTransport = transports.get(node.address()); + if (targetTransport == null) { + throw new ConnectTransportException(node, "Failed to connect"); + } + connectedNodes.put(node, targetTransport); + transportServiceAdapter.raiseNodeConnected(node); + } + } + + @Override public void disconnectFromNode(DiscoveryNode node) { + synchronized (this) { + LocalTransport removed = connectedNodes.remove(node); + if (removed != null) { + transportServiceAdapter.raiseNodeDisconnected(node); + } + } } @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, @@ -110,9 +133,9 @@ public class LocalTransport extends AbstractLifecycleComponent implem stream.writeUTF(action); message.writeTo(stream); - final LocalTransport targetTransport = transports.get(node.address()); + final LocalTransport targetTransport = connectedNodes.get(node); if (targetTransport == null) { - throw new ConnectTransportException(node, "Failed to connect"); + throw new NodeNotConnectedException(node, "Node not connected"); } final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index c23aa2baae4..d887f952543 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -116,7 +116,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile ServerBootstrap serverBootstrap; // node id to actual channel - final ConcurrentMap clientChannels = newConcurrentMap(); + final ConcurrentMap connectedNodes = newConcurrentMap(); private volatile Channel serverChannel; @@ -297,7 +297,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem serverBootstrap = null; } - for (Iterator it = clientChannels.values().iterator(); it.hasNext();) { + for (Iterator it = connectedNodes.values().iterator(); it.hasNext();) { NodeConnections nodeConnections = it.next(); it.remove(); nodeConnections.close(); @@ -309,7 +309,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem ScheduledFuture scheduledFuture = threadPool.schedule(new Runnable() { @Override public void run() { try { - for (Iterator it = clientChannels.values().iterator(); it.hasNext();) { + for (Iterator it = connectedNodes.values().iterator(); it.hasNext();) { NodeConnections nodeConnections = it.next(); it.remove(); nodeConnections.close(); @@ -391,106 +391,115 @@ public class NettyTransport extends AbstractLifecycleComponent implem // }); } - @Override public void nodesAdded(Iterable nodes) { + @Override public boolean nodeConnected(DiscoveryNode node) { + return connectedNodes.containsKey(node.id()); + } + + @Override public void connectToNode(DiscoveryNode node) { if (!lifecycle.started()) { throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport"); } - for (DiscoveryNode node : nodes) { - try { - nodeChannel(node); - } catch (Exception e) { - logger.warn("Failed to connect to discovered node [" + node + "]", e); + try { + if (node == null) { + throw new ConnectTransportException(node, "Can't connect to a null node"); } + NodeConnections nodeConnections = connectedNodes.get(node.id()); + if (nodeConnections != null) { + return; + } + synchronized (this) { + // recheck here, within the sync block (we cache connections, so we don't care about this single sync block) + nodeConnections = connectedNodes.get(node.id()); + if (nodeConnections != null) { + return; + } + // build connection(s) to the node + ArrayList channels = new ArrayList(); + Throwable lastConnectException = null; + for (int connectionIndex = 0; connectionIndex < connectionsPerNode; connectionIndex++) { + for (int i = 1; i <= connectRetries; i++) { + if (!lifecycle.started()) { + for (Channel channel1 : channels) { + channel1.close().awaitUninterruptibly(); + } + throw new ConnectTransportException(node, "Can't connect when the transport is stopped"); + } + InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); + ChannelFuture channelFuture = clientBootstrap.connect(address); + channelFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25)); + if (!channelFuture.isSuccess()) { + // we failed to connect, check if we need to bail or retry + if (i == connectRetries && connectionIndex == 0) { + lastConnectException = channelFuture.getCause(); + if (connectionIndex == 0) { + throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException); + } else { + // break out of the retry loop, try another connection + break; + } + } else { + logger.trace("Retry #[" + i + "], connect to [" + node + "]"); + try { + channelFuture.getChannel().close(); + } catch (Exception e) { + // ignore + } + continue; + } + } + // we got a connection, add it to our connections + Channel channel = channelFuture.getChannel(); + if (!lifecycle.started()) { + channel.close(); + for (Channel channel1 : channels) { + channel1.close().awaitUninterruptibly(); + } + throw new ConnectTransportException(node, "Can't connect when the transport is stopped"); + } + channel.getCloseFuture().addListener(new ChannelCloseListener(node.id())); + channels.add(channel); + break; + } + } + if (channels.isEmpty()) { + if (lastConnectException != null) { + throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException); + } + throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "], reason unknown"); + } + if (logger.isDebugEnabled()) { + logger.debug("Connected to node[{}], number_of_connections[{}]", node, channels.size()); + } + connectedNodes.put(node.id(), new NodeConnections(node, channels.toArray(new Channel[channels.size()]))); + transportServiceAdapter.raiseNodeConnected(node); + } + } catch (Exception e) { + throw new ConnectTransportException(node, "General node connection failure", e); } } - @Override public void nodesRemoved(Iterable nodes) { - for (DiscoveryNode node : nodes) { - NodeConnections nodeConnections = clientChannels.remove(node.id()); - if (nodeConnections != null) { - nodeConnections.close(); - } + @Override public void disconnectFromNode(DiscoveryNode node) { + NodeConnections nodeConnections = connectedNodes.remove(node.id()); + if (nodeConnections != null) { + nodeConnections.close(); } } private Channel nodeChannel(DiscoveryNode node) throws ConnectTransportException { - if (node == null) { - throw new ConnectTransportException(node, "Can't connect to a null node"); + NettyTransport.NodeConnections nodeConnections = connectedNodes.get(node.id()); + if (nodeConnections == null) { + throw new NodeNotConnectedException(node, "Node not connected"); } - NodeConnections nodeConnections = clientChannels.get(node.id()); - if (nodeConnections != null) { - return nodeConnections.channel(); + Channel channel = nodeConnections.channel(); + if (channel == null) { + throw new NodeNotConnectedException(node, "Node not connected"); } - synchronized (this) { - // recheck here, within the sync block (we cache connections, so we don't care about this single sync block) - nodeConnections = clientChannels.get(node.id()); - if (nodeConnections != null) { - return nodeConnections.channel(); - } - // build connection(s) to the node - ArrayList channels = new ArrayList(); - Throwable lastConnectException = null; - for (int connectionIndex = 0; connectionIndex < connectionsPerNode; connectionIndex++) { - for (int i = 1; i <= connectRetries; i++) { - if (!lifecycle.started()) { - for (Channel channel1 : channels) { - channel1.close().awaitUninterruptibly(); - } - throw new ConnectTransportException(node, "Can't connect when the transport is stopped"); - } - InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); - ChannelFuture channelFuture = clientBootstrap.connect(address); - channelFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25)); - if (!channelFuture.isSuccess()) { - // we failed to connect, check if we need to bail or retry - if (i == connectRetries && connectionIndex == 0) { - lastConnectException = channelFuture.getCause(); - if (connectionIndex == 0) { - throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException); - } else { - // break out of the retry loop, try another connection - break; - } - } else { - logger.trace("Retry #[" + i + "], connect to [" + node + "]"); - try { - channelFuture.getChannel().close(); - } catch (Exception e) { - // ignore - } - continue; - } - } - // we got a connection, add it to our connections - Channel channel = channelFuture.getChannel(); - if (!lifecycle.started()) { - channel.close(); - for (Channel channel1 : channels) { - channel1.close().awaitUninterruptibly(); - } - throw new ConnectTransportException(node, "Can't connect when the transport is stopped"); - } - channel.getCloseFuture().addListener(new ChannelCloseListener(node.id())); - channels.add(channel); - break; - } - } - if (channels.isEmpty()) { - if (lastConnectException != null) { - throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException); - } - throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "], reason unknown"); - } - if (logger.isDebugEnabled()) { - logger.debug("Connected to node[{}], number_of_connections[{}]", node, channels.size()); - } - clientChannels.put(node.id(), new NodeConnections(channels.toArray(new Channel[channels.size()]))); - } - - return clientChannels.get(node.id()).channel(); + return channel; } - private static class NodeConnections { + public class NodeConnections { + + private final DiscoveryNode node; private final AtomicInteger counter = new AtomicInteger(); @@ -498,7 +507,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile boolean closed = false; - private NodeConnections(Channel[] channels) { + private NodeConnections(DiscoveryNode node, Channel[] channels) { + this.node = node; this.channels = channels; } @@ -532,6 +542,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem channel.close().awaitUninterruptibly(); } } + transportServiceAdapter.raiseNodeDisconnected(node); } } @@ -544,13 +555,15 @@ public class NettyTransport extends AbstractLifecycleComponent implem } @Override public void operationComplete(ChannelFuture future) throws Exception { - final NodeConnections nodeConnections = clientChannels.get(nodeId); + final NodeConnections nodeConnections = connectedNodes.get(nodeId); if (nodeConnections != null) { nodeConnections.channelClosed(future.getChannel()); if (nodeConnections.numberOfChannels() == 0) { // all the channels in the node connections are closed, remove it from // our client channels - clientChannels.remove(nodeId); + connectedNodes.remove(nodeId); + // and close it + nodeConnections.close(); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java index 6496942e91b..edf27c3f3d0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java @@ -37,7 +37,7 @@ public class NettyTransportManagement { @ManagedAttribute(description = "Number of connections this node has to other nodes") public long getNumberOfOutboundConnections() { - return transport.clientChannels.size(); + return transport.connectedNodes.size(); } @ManagedAttribute(description = "Number if IO worker threads") diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/InetSocketTransportAddress.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/InetSocketTransportAddress.java index 2870e42ed96..78b6dfdbe08 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/InetSocketTransportAddress.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/InetSocketTransportAddress.java @@ -71,12 +71,8 @@ public class InetSocketTransportAddress implements TransportAddress { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - InetSocketTransportAddress address1 = (InetSocketTransportAddress) o; - - if (address != null ? !address.equals(address1.address) : address1.address != null) return false; - - return true; + return address.equals(address1.address); } @Override diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java new file mode 100644 index 00000000000..f49e17381a4 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -0,0 +1,168 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.scaling.ScalingThreadPool; +import org.elasticsearch.util.io.stream.StreamInput; +import org.elasticsearch.util.io.stream.StreamOutput; +import org.elasticsearch.util.io.stream.Streamable; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public abstract class AbstractSimpleTransportTests { + + protected ThreadPool threadPool; + + protected TransportService serviceA; + protected TransportService serviceB; + protected DiscoveryNode serviceANode; + protected DiscoveryNode serviceBNode; + + @BeforeMethod public void setUp() { + threadPool = new ScalingThreadPool(); + build(); + serviceA.connectToNode(serviceBNode); + serviceB.connectToNode(serviceANode); + } + + @AfterMethod public void tearDown() { + serviceA.close(); + serviceB.close(); + + threadPool.shutdown(); + } + + protected abstract void build(); + + @Test public void testHelloWorld() { + serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void messageReceived(StringMessage request, TransportChannel channel) { + System.out.println("got message: " + request.message); + assertThat("moshe", equalTo(request.message)); + try { + channel.sendResponse(new StringMessage("hello " + request.message)); + } catch (IOException e) { + e.printStackTrace(); + assertThat(e.getMessage(), false, equalTo(true)); + } + } + }); + + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + new StringMessage("moshe"), new BaseTransportResponseHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void handleResponse(StringMessage response) { + System.out.println("got response: " + response.message); + assertThat("hello moshe", equalTo(response.message)); + } + + @Override public void handleException(RemoteTransportException exp) { + exp.printStackTrace(); + assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true)); + } + }); + + try { + StringMessage message = res.get(); + assertThat("hello moshe", equalTo(message.message)); + } catch (Exception e) { + assertThat(e.getMessage(), false, equalTo(true)); + } + + System.out.println("after ..."); + } + + @Test public void testErrorMessage() { + serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception { + System.out.println("got message: " + request.message); + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); + } + }); + + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloException", + new StringMessage("moshe"), new BaseTransportResponseHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void handleResponse(StringMessage response) { + assertThat("got response instead of exception", false, equalTo(true)); + } + + @Override public void handleException(RemoteTransportException exp) { + assertThat("bad message !!!", equalTo(exp.getCause().getMessage())); + } + }); + + try { + res.txGet(); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (Exception e) { + assertThat("bad message !!!", equalTo(e.getCause().getMessage())); + } + + System.out.println("after ..."); + + } + + private class StringMessage implements Streamable { + + private String message; + + private StringMessage(String message) { + this.message = message; + } + + private StringMessage() { + } + + @Override public void readFrom(StreamInput in) throws IOException { + message = in.readUTF(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(message); + } + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index 79e36daaf3e..569619f78f5 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -20,149 +20,18 @@ package org.elasticsearch.transport.local; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.scaling.ScalingThreadPool; -import org.elasticsearch.transport.*; -import org.elasticsearch.util.io.stream.StreamInput; -import org.elasticsearch.util.io.stream.StreamOutput; -import org.elasticsearch.util.io.stream.Streamable; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.elasticsearch.transport.AbstractSimpleTransportTests; +import org.elasticsearch.transport.TransportService; import org.testng.annotations.Test; -import java.io.IOException; - -import static org.hamcrest.MatcherAssert.*; -import static org.hamcrest.Matchers.*; - -public class SimpleLocalTransportTests { - - private ThreadPool threadPool; - - private TransportService serviceA; - private TransportService serviceB; - private DiscoveryNode serviceANode; - private DiscoveryNode serviceBNode; - - @BeforeClass public void setUp() { - threadPool = new ScalingThreadPool(); +@Test +public class SimpleLocalTransportTests extends AbstractSimpleTransportTests { + @Override protected void build() { serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress()); serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start(); serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress()); } - - @AfterClass public void tearDown() { - serviceA.close(); - serviceB.close(); - - threadPool.shutdown(); - } - - @Test public void testHelloWorld() { - serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void messageReceived(StringMessage request, TransportChannel channel) { - System.out.println("got message: " + request.message); - assertThat("moshe", equalTo(request.message)); - try { - channel.sendResponse(new StringMessage("hello " + request.message)); - } catch (IOException e) { - e.printStackTrace(); - assertThat(e.getMessage(), false, equalTo(true)); - } - } - }); - - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", - new StringMessage("moshe"), new BaseTransportResponseHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void handleResponse(StringMessage response) { - System.out.println("got response: " + response.message); - assertThat("hello moshe", equalTo(response.message)); - } - - @Override public void handleException(RemoteTransportException exp) { - exp.printStackTrace(); - assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true)); - } - }); - - try { - StringMessage message = res.get(); - assertThat("hello moshe", equalTo(message.message)); - } catch (Exception e) { - assertThat(e.getMessage(), false, equalTo(true)); - } - - System.out.println("after ..."); - } - - @Test public void testErrorMessage() { - serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception { - System.out.println("got message: " + request.message); - assertThat("moshe", equalTo(request.message)); - throw new RuntimeException("bad message !!!"); - } - }); - - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloException", - new StringMessage("moshe"), new BaseTransportResponseHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void handleResponse(StringMessage response) { - assertThat("got response instead of exception", false, equalTo(true)); - } - - @Override public void handleException(RemoteTransportException exp) { - assertThat("bad message !!!", equalTo(exp.getCause().getMessage())); - } - }); - - try { - res.txGet(); - assertThat("exception should be thrown", false, equalTo(true)); - } catch (Exception e) { - assertThat("bad message !!!", equalTo(e.getCause().getMessage())); - } - - System.out.println("after ..."); - - } - - private class StringMessage implements Streamable { - - private String message; - - private StringMessage(String message) { - this.message = message; - } - - private StringMessage() { - } - - @Override public void readFrom(StreamInput in) throws IOException { - message = in.readUTF(); - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(message); - } - } - } \ No newline at end of file diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index c004b914165..70bcd834523 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -20,149 +20,18 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.scaling.ScalingThreadPool; -import org.elasticsearch.transport.*; -import org.elasticsearch.util.io.stream.StreamInput; -import org.elasticsearch.util.io.stream.StreamOutput; -import org.elasticsearch.util.io.stream.Streamable; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.elasticsearch.transport.AbstractSimpleTransportTests; +import org.elasticsearch.transport.TransportService; import org.testng.annotations.Test; -import java.io.IOException; - -import static org.hamcrest.MatcherAssert.*; -import static org.hamcrest.Matchers.*; - -public class SimpleNettyTransportTests { - - private ThreadPool threadPool; - - private TransportService serviceA; - private TransportService serviceB; - private DiscoveryNode serviceANode; - private DiscoveryNode serviceBNode; - - @BeforeClass public void setUp() { - threadPool = new ScalingThreadPool(); +@Test +public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { + @Override protected void build() { serviceA = new TransportService(new NettyTransport(threadPool), threadPool).start(); serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress()); serviceB = new TransportService(new NettyTransport(threadPool), threadPool).start(); serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress()); } - - @AfterClass public void tearDown() { - serviceA.close(); - serviceB.close(); - - threadPool.shutdown(); - } - - @Test public void testHelloWorld() { - serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void messageReceived(StringMessage request, TransportChannel channel) { - System.out.println("got message: " + request.message); - assertThat("moshe", equalTo(request.message)); - try { - channel.sendResponse(new StringMessage("hello " + request.message)); - } catch (IOException e) { - e.printStackTrace(); - assertThat(e.getMessage(), false, equalTo(true)); - } - } - }); - - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", - new StringMessage("moshe"), new BaseTransportResponseHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void handleResponse(StringMessage response) { - System.out.println("got response: " + response.message); - assertThat("hello moshe", equalTo(response.message)); - } - - @Override public void handleException(RemoteTransportException exp) { - exp.printStackTrace(); - assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true)); - } - }); - - try { - StringMessage message = res.get(); - assertThat("hello moshe", equalTo(message.message)); - } catch (Exception e) { - assertThat(e.getMessage(), false, equalTo(true)); - } - - System.out.println("after ..."); - } - - @Test public void testErrorMessage() { - serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception { - System.out.println("got message: " + request.message); - assertThat("moshe", equalTo(request.message)); - throw new RuntimeException("bad message !!!"); - } - }); - - TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloException", - new StringMessage("moshe"), new BaseTransportResponseHandler() { - @Override public StringMessage newInstance() { - return new StringMessage(); - } - - @Override public void handleResponse(StringMessage response) { - assertThat("got response instead of exception", false, equalTo(true)); - } - - @Override public void handleException(RemoteTransportException exp) { - assertThat("bad message !!!", equalTo(exp.getCause().getMessage())); - } - }); - - try { - res.txGet(); - assertThat("exception should be thrown", false, equalTo(true)); - } catch (Exception e) { - assertThat("bad message !!!", equalTo(e.getCause().getMessage())); - } - - System.out.println("after ..."); - - } - - private class StringMessage implements Streamable { - - private String message; - - private StringMessage(String message) { - this.message = message; - } - - private StringMessage() { - } - - @Override public void readFrom(StreamInput in) throws IOException { - message = in.readUTF(); - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(message); - } - } - } \ No newline at end of file diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java index b5fd824dbed..c877e16e0ba 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyClient.java @@ -19,7 +19,6 @@ package org.elasticsearch.transport.netty.benchmark; -import com.google.common.collect.Lists; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.cached.CachedThreadPool; @@ -62,7 +61,7 @@ public class BenchmarkNettyClient { final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999)); - transportService.nodesAdded(Lists.newArrayList(node)); + transportService.connectToNode(node); Thread[] clients = new Thread[NUMBER_OF_CLIENTS]; diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java index b7df0756b54..a4e107f0cdf 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java @@ -41,9 +41,9 @@ public class TransportClientDocumentActionsTests extends DocumentActionsTests { } @Override protected Client getClient2() { - TransportAddress server1Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); + TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportClient client = new TransportClient(settingsBuilder().put("discovery.enabled", false).build()); - client.addTransportAddress(server1Address); + client.addTransportAddress(server2Address); return client; } }