diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java index 051afacd3c2..cb92631e774 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/jgroups/JgroupsDiscovery.java @@ -173,7 +173,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent impl try { channel.send(new Message(channel.getView().getCreator(), channel.getAddress(), nodeMessagePayload())); addressSet = true; - logger.debug("Sent address [{}] to master [{}]", transportService.boundAddress().publishAddress(), channel.getView().getCreator()); + logger.debug("Sent (initial) node information to master [{}], node [{}]", channel.getView().getCreator(), localNode); } catch (Exception e) { logger.warn("Can't send address to master [" + channel.getView().getCreator() + "] will try again later...", e); } @@ -255,16 +255,21 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent impl logger.debug("Received node information from [{}], node [{}]", msg.getSrc(), newNode); } - clusterService.submitStateUpdateTask("jgroups-disco-receive(from node[" + newNode + "])", new ClusterStateUpdateTask() { - @Override public ClusterState execute(ClusterState currentState) { - if (currentState.nodes().nodeExists(newNode.id())) { - // no change, the node already exists in the cluster - logger.warn("Received an address [{}] for an existing node [{}]", newNode.address(), newNode); - return currentState; + if (!transportService.addressSupported(newNode.address().getClass())) { + // TODO, what should we do now? Maybe inform that node that its crap? + logger.warn("Received a wrong address type from [" + msg.getSrc() + "], ignoring... (received_address[" + newNode.address() + ")"); + } else { + clusterService.submitStateUpdateTask("jgroups-disco-receive(from node[" + newNode + "])", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + if (currentState.nodes().nodeExists(newNode.id())) { + // no change, the node already exists in the cluster + logger.warn("Received an address [{}] for an existing node [{}]", newNode.address(), newNode); + return currentState; + } + return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(newNode)).build(); } - return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(newNode)).build(); - } - }); + }); + } } catch (Exception e) { logger.warn("Can't read address from cluster member [" + msg.getSrc() + "] message [" + msg.getClass().getName() + "/" + msg + "]", e); } @@ -290,7 +295,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent impl if (!addressSet) { try { channel.send(new Message(newView.getCreator(), channel.getAddress(), nodeMessagePayload())); - logger.debug("Sent address [{}] to master [{}]", localNode.address(), newView.getCreator()); + logger.debug("Sent (view) node information to master [{}], node [{}]", newView.getCreator(), localNode); addressSet = true; } catch (Exception e) { logger.warn("Can't send address to master [" + newView.getCreator() + "] will try again later...", e); @@ -331,7 +336,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent impl } if (!foundMe) { - logger.warn("Disconnected from cluster, resending address [{}] to master [{}]", localNode.address(), newView.getCreator()); + logger.warn("Disconnected from cluster, resending to master [{}], node [{}]", newView.getCreator(), localNode); try { channel.send(new Message(newView.getCreator(), channel.getAddress(), nodeMessagePayload())); addressSet = true; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java index 0501efb83e2..272a7ec695d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java @@ -23,11 +23,12 @@ import org.elasticsearch.cluster.node.Node; import org.elasticsearch.util.component.LifecycleComponent; import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.transport.BoundTransportAddress; +import org.elasticsearch.util.transport.TransportAddress; import java.io.IOException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public interface Transport extends LifecycleComponent { @@ -64,6 +65,11 @@ public interface Transport extends LifecycleComponent { BoundTransportAddress boundAddress(); + /** + * Is the address type supported. + */ + boolean addressSupported(Class address); + void nodesAdded(Iterable nodes); void nodesRemoved(Iterable nodes); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index c8d6fdfe87e..88ec72ae654 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -28,6 +28,7 @@ import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong; import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.transport.BoundTransportAddress; +import org.elasticsearch.util.transport.TransportAddress; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -87,6 +88,10 @@ public class TransportService extends AbstractLifecycleComponent address) { + return transport.addressSupported(address); + } + public BoundTransportAddress boundAddress() { return transport.boundAddress(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 50ef992c128..7f3ed394373 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -67,6 +67,10 @@ public class LocalTransport extends AbstractLifecycleComponent implem this.threadPool = threadPool; } + @Override public boolean addressSupported(Class address) { + return LocalTransportAddress.class.equals(address); + } + @Override protected void doStart() throws ElasticSearchException { localAddress = new LocalTransportAddress(Long.toString(transportAddressIdGenerator.incrementAndGet())); transports.put(localAddress, this); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 918222f98c2..a6eb3160588 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -328,6 +328,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem @Override protected void doClose() throws ElasticSearchException { } + @Override public boolean addressSupported(Class address) { + return InetSocketTransportAddress.class.equals(address); + } + @Override public BoundTransportAddress boundAddress() { return this.boundAddress; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java index ab07c795234..c315db7839e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java @@ -24,7 +24,7 @@ package org.elasticsearch.util.transport; * the address the transport is bounded on, the the published one represents the one clients should * communicate on. * - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class BoundTransportAddress {