allow to also define disocvery.zen.ping.unicast.hosts with a single comma delimited hosts. Also allow to define ports range (i.e. host[9300-9305])

This commit is contained in:
kimchy 2010-04-26 12:27:04 +03:00
parent 2878ae7dd6
commit 5f98942911
7 changed files with 60 additions and 20 deletions

View File

@ -54,7 +54,7 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName));
}
if (componentSettings.getAsArray("unicast.hosts").length > 0) {
if (componentSettings.get("unicast.hosts") != null || componentSettings.getAsArray("unicast.hosts").length > 0) {
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName));
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery.zen.ping.unicast;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.gcommon.collect.ImmutableList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
@ -32,10 +33,12 @@ import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.util.gcommon.collect.Lists;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.TransportAddress;
import java.io.IOException;
import java.util.List;
@ -64,8 +67,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final ClusterName clusterName;
private final String[] hosts;
private final DiscoveryNode[] nodes;
private volatile DiscoveryNodesProvider nodesProvider;
@ -83,15 +84,22 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
this.transportService = transportService;
this.clusterName = clusterName;
this.hosts = componentSettings.getAsArray("hosts");
this.nodes = new DiscoveryNode[hosts.length];
for (int i = 0; i < hosts.length; i++) {
List<String> hosts = Lists.newArrayList(componentSettings.getAsArray("hosts"));
if (componentSettings.get("hosts") != null) {
hosts.addAll(Strings.commaDelimitedListToSet(componentSettings.get("hosts")));
}
List<DiscoveryNode> nodes = Lists.newArrayList();
int idCounter = 0;
for (String host : hosts) {
try {
nodes[i] = new DiscoveryNode("#zen_unicast_" + i + "#", transportService.addressFromString(hosts[i]));
for (TransportAddress address : transportService.addressesFromString(host)) {
nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", address));
}
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("Failed to resolve address for [" + hosts[i] + "]", e);
throw new ElasticSearchIllegalArgumentException("Failed to resolve address for [" + host + "]", e);
}
}
this.nodes = nodes.toArray(new DiscoveryNode[nodes.size()]);
transportService.registerHandler(UnicastPingRequestHandler.ACTION, new UnicastPingRequestHandler());
}

View File

@ -71,7 +71,7 @@ public interface Transport extends LifecycleComponent<Transport> {
/**
* Returns an address from its string representation.
*/
TransportAddress addressFromString(String address) throws Exception;
TransportAddress[] addressesFromString(String address) throws Exception;
/**
* Is the address type supported.

View File

@ -180,8 +180,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return requestIds.getAndIncrement();
}
public TransportAddress addressFromString(String address) throws Exception {
return transport.addressFromString(address);
public TransportAddress[] addressesFromString(String address) throws Exception {
return transport.addressesFromString(address);
}
public void registerHandler(ActionTransportRequestHandler handler) {

View File

@ -70,8 +70,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
this.threadPool = threadPool;
}
@Override public TransportAddress addressFromString(String address) {
return new LocalTransportAddress(address);
@Override public TransportAddress[] addressesFromString(String address) {
return new TransportAddress[] {new LocalTransportAddress(address)};
}
@Override public boolean addressSupported(Class<? extends TransportAddress> address) {

View File

@ -19,6 +19,8 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.gcommon.collect.Lists;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
@ -53,6 +55,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
@ -314,14 +317,28 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override protected void doClose() throws ElasticSearchException {
}
@Override public TransportAddress addressFromString(String address) throws Exception {
int index = address.lastIndexOf(':');
if (index == -1) {
throw new ElasticSearchIllegalStateException("Port must be provided to create inet address from [" + address + "]");
@Override public TransportAddress[] addressesFromString(String address) throws Exception {
int index = address.indexOf('[');
if (index != -1) {
String host = address.substring(0, index);
Set<String> ports = Strings.commaDelimitedListToSet(address.substring(index + 1, address.indexOf(']')));
List<TransportAddress> addresses = Lists.newArrayList();
for (String port : ports) {
int[] iPorts = new PortsRange(port).ports();
for (int iPort : iPorts) {
addresses.add(new InetSocketTransportAddress(host, iPort));
}
}
return addresses.toArray(new TransportAddress[addresses.size()]);
} else {
index = address.lastIndexOf(':');
if (index == -1) {
throw new ElasticSearchIllegalStateException("Port must be provided to create inet address from [" + address + "]");
}
String host = address.substring(0, index);
int port = Integer.parseInt(address.substring(index + 1));
return new TransportAddress[] {new InetSocketTransportAddress(host, port)};
}
String host = address.substring(0, index);
int port = Integer.parseInt(address.substring(index + 1));
return new InetSocketTransportAddress(host, port);
}
@Override public boolean addressSupported(Class<? extends TransportAddress> address) {

View File

@ -19,6 +19,10 @@
package org.elasticsearch.util.transport;
import org.elasticsearch.util.gcommon.collect.Lists;
import org.elasticsearch.util.gnu.trove.TIntArrayList;
import java.util.List;
import java.util.StringTokenizer;
/**
@ -32,6 +36,17 @@ public class PortsRange {
this.portRange = portRange;
}
public int[] ports() throws NumberFormatException {
final TIntArrayList ports = new TIntArrayList();
iterate(new PortCallback() {
@Override public boolean onPortNumber(int portNumber) {
ports.add(portNumber);
return false;
}
});
return ports.toNativeArray();
}
public boolean iterate(PortCallback callback) throws NumberFormatException {
StringTokenizer st = new StringTokenizer(portRange, ",");
boolean success = false;