From 453ede8f57b3d12cf9eda563b82aa12057fbd18a Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 25 Apr 2010 12:01:11 +0300 Subject: [PATCH] zen discovery, support for unicast discovery --- .idea/dictionaries/kimchy.xml | 1 + .../cluster/node/DiscoveryNodes.java | 10 + .../discovery/zen/ZenDiscovery.java | 6 +- .../discovery/zen/ping/ZenPing.java | 21 +- .../discovery/zen/ping/ZenPingService.java | 11 +- .../zen/ping/multicast/MulticastZenPing.java | 36 +-- .../zen/ping/unicast/UnicastZenPing.java | 298 ++++++++++++++++++ .../transport/ConnectTransportException.java | 14 +- .../NodeDisconnectedTransportException.java | 4 +- .../elasticsearch/transport/Transport.java | 8 + .../transport/TransportService.java | 4 + .../transport/local/LocalTransport.java | 4 + .../transport/netty/NettyTransport.java | 10 + 13 files changed, 395 insertions(+), 32 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 4e8f214db02..552ebba3435 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -73,6 +73,7 @@ traslog trie tuple + unicast unregister uuid versioned diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index 3c5cdf8f754..67981e62662 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -25,6 +25,7 @@ import com.google.common.collect.UnmodifiableIterator; import org.elasticsearch.util.Nullable; import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamOutput; +import org.elasticsearch.util.transport.TransportAddress; import java.io.IOException; import java.util.List; @@ -143,6 +144,15 @@ public class DiscoveryNodes implements Iterable { return masterNode(); } + public DiscoveryNode findByAddress(TransportAddress address) { + for (DiscoveryNode node : nodes.values()) { + if (node.address().equals(address)) { + return node; + } + } + return null; + } + public DiscoveryNodes removeDeadMembers(Set newNodes, String masterNodeId) { Builder builder = new Builder().masterNodeId(masterNodeId).localNodeId(localNodeId); for (DiscoveryNode node : this) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7045c0d2d58..68a60a333a7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -120,7 +120,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen boolean retry = true; while (retry) { retry = false; - DiscoveryNode masterNode = pingTillMasterResolved(); + DiscoveryNode masterNode = broadBingTillMasterResolved(); if (localNode.equals(masterNode)) { // we are the master (first) this.firstMaster = true; @@ -182,7 +182,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen logger.debug("Failed to send leave request to master [{}]", e, latestDiscoNodes.masterNode()); } } else { - DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(latestDiscoNodes.nodes().values(), 3); + DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(latestDiscoNodes.nodes().values(), 5); for (DiscoveryNode possibleMaster : possibleMasters) { if (localNode.equals(possibleMaster)) { continue; @@ -365,7 +365,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } - private DiscoveryNode pingTillMasterResolved() { + private DiscoveryNode broadBingTillMasterResolved() { while (true) { ZenPing.PingResponse[] pingResponses = pingService.pingAndWait(initialPingTimeout); List pingMasters = newArrayList(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java index 06475a76dcf..3b068dc3683 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen.ping; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.util.TimeValue; @@ -30,6 +31,7 @@ import org.elasticsearch.util.io.stream.Streamable; import java.io.IOException; +import static org.elasticsearch.cluster.ClusterName.*; import static org.elasticsearch.cluster.node.DiscoveryNode.*; /** @@ -48,16 +50,23 @@ public interface ZenPing extends LifecycleComponent { public class PingResponse implements Streamable { + private ClusterName clusterName; + private DiscoveryNode target; private DiscoveryNode master; - public PingResponse() { + private PingResponse() { } - public PingResponse(DiscoveryNode target, DiscoveryNode master) { + public PingResponse(DiscoveryNode target, DiscoveryNode master, ClusterName clusterName) { this.target = target; this.master = master; + this.clusterName = clusterName; + } + + public ClusterName clusterName() { + return this.clusterName; } public DiscoveryNode target() { @@ -68,7 +77,14 @@ public interface ZenPing extends LifecycleComponent { return master; } + public static PingResponse readPingResponse(StreamInput in) throws IOException { + PingResponse response = new PingResponse(); + response.readFrom(in); + return response; + } + @Override public void readFrom(StreamInput in) throws IOException { + clusterName = readClusterName(in); target = readNode(in); if (in.readBoolean()) { master = readNode(in); @@ -76,6 +92,7 @@ public interface ZenPing extends LifecycleComponent { } @Override public void writeTo(StreamOutput out) throws IOException { + clusterName.writeTo(out); target.writeTo(out); if (master == null) { out.writeBoolean(false); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index d9938e54292..f30effb39e9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; +import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.util.TimeValue; @@ -49,7 +50,15 @@ public class ZenPingService extends AbstractLifecycleComponent implemen @Inject public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { super(settings); - this.zenPings = ImmutableList.of(new MulticastZenPing(settings, threadPool, transportService, clusterName)); + ImmutableList.Builder zenPingsBuilder = ImmutableList.builder(); + if (componentSettings.getAsBoolean("multicast.enabled", true)) { + zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName)); + } + if (componentSettings.getAsArray("unicast.hosts").length > 0) { + zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName)); + } + + this.zenPings = zenPingsBuilder.build(); } public ImmutableList zenPings() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 538a204cdde..e49ef79975c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -109,7 +109,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem this.bufferSize = componentSettings.getAsInt("buffer_size", 2048); this.ttl = componentSettings.getAsInt("ttl", 3); - this.transportService.registerHandler(PingResponseRequestHandler.ACTION, new PingResponseRequestHandler()); + this.transportService.registerHandler(MulticastPingResponseRequestHandler.ACTION, new MulticastPingResponseRequestHandler()); } @Override public void setNodesProvider(DiscoveryNodesProvider nodesProvider) { @@ -197,7 +197,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem ConcurrentMap responses = receivedResponses.remove(id); listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); } - }, timeout.millis(), TimeUnit.MILLISECONDS); + }, timeout); } private void sendPingRequest(int id) { @@ -224,15 +224,15 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } } - private class PingResponseRequestHandler extends BaseTransportRequestHandler { + class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler { static final String ACTION = "discovery/zen/multicast"; - @Override public WrappedPingResponse newInstance() { - return new WrappedPingResponse(); + @Override public MulticastPingResponse newInstance() { + return new MulticastPingResponse(); } - @Override public void messageReceived(WrappedPingResponse request, TransportChannel channel) throws Exception { + @Override public void messageReceived(MulticastPingResponse request, TransportChannel channel) throws Exception { if (logger.isTraceEnabled()) { logger.trace("[{}] Received {}", request.id, request.pingResponse); } @@ -246,24 +246,18 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } } - class WrappedPingResponse implements Streamable { + static class MulticastPingResponse implements Streamable { int id; PingResponse pingResponse; - WrappedPingResponse() { - } - - WrappedPingResponse(int id, PingResponse pingResponse) { - this.id = id; - this.pingResponse = pingResponse; + MulticastPingResponse() { } @Override public void readFrom(StreamInput in) throws IOException { id = in.readInt(); - pingResponse = new PingResponse(); - pingResponse.readFrom(in); + pingResponse = PingResponse.readPingResponse(in); } @Override public void writeTo(StreamOutput out) throws IOException { @@ -318,12 +312,12 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem // not our cluster, ignore it... continue; } - final WrappedPingResponse wrappedPingResponse = new WrappedPingResponse(); - wrappedPingResponse.id = id; - wrappedPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode()); + final MulticastPingResponse multicastPingResponse = new MulticastPingResponse(); + multicastPingResponse.id = id; + multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName); if (logger.isTraceEnabled()) { - logger.trace("[{}] Received ping_request from [{}], sending {}", id, requestingNode, wrappedPingResponse.pingResponse); + logger.trace("[{}] Received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse); } if (!transportService.nodeConnected(requestingNode)) { @@ -336,7 +330,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } catch (Exception e) { logger.warn("Failed to connect to requesting node {}", e, requestingNode); } - transportService.sendRequest(requestingNode, PingResponseRequestHandler.ACTION, wrappedPingResponse, new VoidTransportResponseHandler(false) { + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) { @Override public void handleException(RemoteTransportException exp) { logger.warn("Failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } @@ -344,7 +338,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } }); } else { - transportService.sendRequest(requestingNode, PingResponseRequestHandler.ACTION, wrappedPingResponse, new VoidTransportResponseHandler(false) { + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) { @Override public void handleException(RemoteTransportException exp) { logger.warn("Failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java new file mode 100644 index 00000000000..808ddd5b83c --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -0,0 +1,298 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen.ping.unicast; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; +import org.elasticsearch.util.TimeValue; +import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue; +import org.elasticsearch.util.io.stream.StreamInput; +import org.elasticsearch.util.io.stream.StreamOutput; +import org.elasticsearch.util.io.stream.Streamable; +import org.elasticsearch.util.settings.Settings; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.collect.Lists.*; +import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.*; +import static org.elasticsearch.util.TimeValue.*; +import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; + +/** + * @author kimchy (shay.banon) + */ +public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPing { + + private final ThreadPool threadPool; + + private final TransportService transportService; + + private final ClusterName clusterName; + + + private final String[] hosts; + + private final DiscoveryNode[] nodes; + + private volatile DiscoveryNodesProvider nodesProvider; + + private final AtomicInteger pingIdGenerator = new AtomicInteger(); + + private final Map> receivedResponses = newConcurrentMap(); + + // a list of temporal responses a node will return for a request (holds requests from other nodes) + private final Queue temporalResponses = new LinkedTransferQueue(); + + public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.clusterName = clusterName; + + this.hosts = componentSettings.getAsArray("hosts"); + this.nodes = new DiscoveryNode[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + try { + nodes[i] = new DiscoveryNode("#zen_unicast_" + i + "#", transportService.addressFromString(hosts[i])); + } catch (Exception e) { + throw new ElasticSearchIllegalArgumentException("Failed to resolve address for [" + hosts[i] + "]", e); + } + } + + transportService.registerHandler(UnicastPingRequestHandler.ACTION, new UnicastPingRequestHandler()); + } + + @Override protected void doStart() throws ElasticSearchException { + } + + @Override protected void doStop() throws ElasticSearchException { + } + + @Override protected void doClose() throws ElasticSearchException { + transportService.removeHandler(UnicastPingRequestHandler.ACTION); + } + + protected List buildDynamicNodes() { + return ImmutableList.of(); + } + + @Override public void setNodesProvider(DiscoveryNodesProvider nodesProvider) { + this.nodesProvider = nodesProvider; + } + + @Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException { + final int id = pingIdGenerator.incrementAndGet(); + receivedResponses.put(id, new ConcurrentHashMap()); + sendPings(id, timeout, false); + threadPool.schedule(new Runnable() { + @Override public void run() { + sendPings(id, timeout, true); + ConcurrentMap responses = receivedResponses.remove(id); + listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); + } + }, timeout); + } + + private void sendPings(int id, TimeValue timeout, boolean wait) { + UnicastPingRequest pingRequest = new UnicastPingRequest(); + pingRequest.id = id; + pingRequest.timeout = timeout; + DiscoveryNodes discoNodes = nodesProvider.nodes(); + pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName); + + List nodesToPing = newArrayList(nodes); + nodesToPing.addAll(buildDynamicNodes()); + + final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); + for (final DiscoveryNode node : nodesToPing) { + // make sure we are connected + boolean disconnectX; + DiscoveryNode nodeToSendX = discoNodes.findByAddress(node.address()); + if (nodeToSendX != null) { + disconnectX = false; + } else { + nodeToSendX = node; + disconnectX = true; + } + final DiscoveryNode nodeToSend = nodeToSendX; + try { + transportService.connectToNode(nodeToSend); + } catch (ConnectTransportException e) { + latch.countDown(); + // can't connect to the node + continue; + } + + final boolean disconnect = disconnectX; + transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TimeValue.timeValueMillis((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler() { + + @Override public UnicastPingResponse newInstance() { + return new UnicastPingResponse(); + } + + @Override public void handleResponse(UnicastPingResponse response) { + try { + DiscoveryNodes discoveryNodes = nodesProvider.nodes(); + for (PingResponse pingResponse : response.pingResponses) { + if (disconnect) { + transportService.disconnectFromNode(nodeToSend); + } + if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) { + // that's us, ignore + continue; + } + if (!pingResponse.clusterName().equals(clusterName)) { + // not part of the cluster + return; + } + ConcurrentMap responses = receivedResponses.get(response.id); + if (responses == null) { + logger.warn("Received ping response with no matching id [{}]", response.id); + } else { + responses.put(pingResponse.target(), pingResponse); + } + } + } finally { + latch.countDown(); + } + } + + @Override public void handleException(RemoteTransportException exp) { + latch.countDown(); + if (exp instanceof ConnectTransportException) { + // ok, not connected... + } else { + if (disconnect) { + transportService.disconnectFromNode(nodeToSend); + } + logger.warn("Failed to send ping to [{}]", exp, node); + } + } + }); + } + if (wait) { + try { + latch.await(timeout.millis() * 5, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + } + } + } + + private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) { + temporalResponses.add(request.pingResponse); + threadPool.schedule(new Runnable() { + @Override public void run() { + temporalResponses.remove(request.pingResponse); + } + }, request.timeout.millis() * 2, TimeUnit.MILLISECONDS); + + List pingResponses = newArrayList(temporalResponses); + DiscoveryNodes discoNodes = nodesProvider.nodes(); + pingResponses.add(new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName)); + + + UnicastPingResponse unicastPingResponse = new UnicastPingResponse(); + unicastPingResponse.id = request.id; + unicastPingResponse.pingResponses = pingResponses.toArray(new PingResponse[pingResponses.size()]); + + return unicastPingResponse; + } + + class UnicastPingRequestHandler extends BaseTransportRequestHandler { + + static final String ACTION = "discovery/zen/unicast"; + + @Override public UnicastPingRequest newInstance() { + return new UnicastPingRequest(); + } + + @Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception { + channel.sendResponse(handlePingRequest(request)); + } + } + + static class UnicastPingRequest implements Streamable { + + int id; + + TimeValue timeout; + + PingResponse pingResponse; + + UnicastPingRequest() { + } + + @Override public void readFrom(StreamInput in) throws IOException { + id = in.readInt(); + timeout = readTimeValue(in); + pingResponse = readPingResponse(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeInt(id); + timeout.writeTo(out); + pingResponse.writeTo(out); + } + } + + static class UnicastPingResponse implements Streamable { + + int id; + + PingResponse[] pingResponses; + + UnicastPingResponse() { + } + + @Override public void readFrom(StreamInput in) throws IOException { + id = in.readInt(); + pingResponses = new PingResponse[in.readVInt()]; + for (int i = 0; i < pingResponses.length; i++) { + pingResponses[i] = readPingResponse(in); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeInt(id); + out.writeVInt(pingResponses.length); + for (PingResponse pingResponse : pingResponses) { + pingResponse.writeTo(out); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/ConnectTransportException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/ConnectTransportException.java index ed1a9db6546..61c5503ecf4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/ConnectTransportException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/ConnectTransportException.java @@ -24,16 +24,24 @@ import org.elasticsearch.cluster.node.DiscoveryNode; /** * @author kimchy (shay.banon) */ -public class ConnectTransportException extends TransportException { +public class ConnectTransportException extends RemoteTransportException { private final DiscoveryNode node; public ConnectTransportException(DiscoveryNode node, String msg) { - this(node, msg, null); + this(node, msg, null, null); + } + + public ConnectTransportException(DiscoveryNode node, String msg, String action) { + this(node, msg, action, null); } public ConnectTransportException(DiscoveryNode node, String msg, Throwable cause) { - super(node + ": " + msg, cause); + this(node, msg, null, cause); + } + + public ConnectTransportException(DiscoveryNode node, String msg, String action, Throwable cause) { + super(node.name(), node.address(), action, cause); this.node = node; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedTransportException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedTransportException.java index c40e8a0a87d..62e0e656dd6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedTransportException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/NodeDisconnectedTransportException.java @@ -24,10 +24,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; /** * @author kimchy (shay.banon) */ -public class NodeDisconnectedTransportException extends RemoteTransportException { +public class NodeDisconnectedTransportException extends ConnectTransportException { public NodeDisconnectedTransportException(DiscoveryNode node, String action) { - super(node.name(), node.address(), action, null); + super(node, "disconnected", action, null); } // @Override public Throwable fillInStackTrace() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java index ea6222d73ae..8d3adc1c978 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java @@ -63,8 +63,16 @@ public interface Transport extends LifecycleComponent { void transportServiceAdapter(TransportServiceAdapter service); + /** + * The address the transport is bound on. + */ BoundTransportAddress boundAddress(); + /** + * Returns an address from its string representation. + */ + TransportAddress addressFromString(String address) throws Exception; + /** * Is the address type supported. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index 0339c6502bf..37cb31a7052 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -180,6 +180,10 @@ public class TransportService extends AbstractLifecycleComponent implem this.threadPool = threadPool; } + @Override public TransportAddress addressFromString(String address) { + return new LocalTransportAddress(address); + } + @Override public boolean addressSupported(Class address) { return LocalTransportAddress.class.equals(address); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 4741106f840..2a2a59f6b18 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -314,6 +314,16 @@ public class NettyTransport extends AbstractLifecycleComponent implem @Override protected void doClose() throws ElasticSearchException { } + @Override public TransportAddress addressFromString(String address) throws Exception { + int index = address.lastIndexOf(':'); + if (index == -1) { + throw new ElasticSearchIllegalStateException("Port must be provided to create inet address from [" + address + "]"); + } + String host = address.substring(0, index); + int port = Integer.parseInt(address.substring(index + 1)); + return new InetSocketTransportAddress(host, port); + } + @Override public boolean addressSupported(Class address) { return InetSocketTransportAddress.class.equals(address); }