EC2 Zen Discovery: Automatically use the configured transport port to ping other nodes, closes #854.
This commit is contained in:
parent
853ade7297
commit
3b72d63035
|
@ -58,6 +58,8 @@ import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.*;
|
||||||
*/
|
*/
|
||||||
public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
|
public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
|
||||||
|
|
||||||
|
public static final int LIMIT_PORTS_COUNT = 1;
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
private final TransportService transportService;
|
private final TransportService transportService;
|
||||||
|
@ -96,8 +98,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
||||||
for (String host : hosts) {
|
for (String host : hosts) {
|
||||||
try {
|
try {
|
||||||
TransportAddress[] addresses = transportService.addressesFromString(host);
|
TransportAddress[] addresses = transportService.addressesFromString(host);
|
||||||
// we only limit to 5 addresses, makes no sense to ping 100 ports
|
// we only limit to 1 addresses, makes no sense to ping 100 ports
|
||||||
for (int i = 0; (i < addresses.length && i < 5); i++) {
|
for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) {
|
||||||
nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i]));
|
nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i]));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -30,9 +30,10 @@ import org.elasticsearch.common.collect.Sets;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.common.transport.PortsRange;
|
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
||||||
|
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -51,9 +52,9 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||||
PUBLIC_DNS
|
PUBLIC_DNS
|
||||||
}
|
}
|
||||||
|
|
||||||
private final AmazonEC2 client;
|
private final TransportService transportService;
|
||||||
|
|
||||||
private final String ports;
|
private final AmazonEC2 client;
|
||||||
|
|
||||||
private final boolean bindAnyGroup;
|
private final boolean bindAnyGroup;
|
||||||
|
|
||||||
|
@ -65,12 +66,12 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||||
|
|
||||||
private final HostType hostType;
|
private final HostType hostType;
|
||||||
|
|
||||||
@Inject public AwsEc2UnicastHostsProvider(Settings settings, AmazonEC2 client) {
|
@Inject public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AmazonEC2 client) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
this.transportService = transportService;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
|
||||||
this.hostType = HostType.valueOf(componentSettings.get("host_type", "private_ip").toUpperCase());
|
this.hostType = HostType.valueOf(componentSettings.get("host_type", "private_ip").toUpperCase());
|
||||||
this.ports = componentSettings.get("ports", "9300-9302");
|
|
||||||
|
|
||||||
this.bindAnyGroup = componentSettings.getAsBoolean("any_group", true);
|
this.bindAnyGroup = componentSettings.getAsBoolean("any_group", true);
|
||||||
this.groups = ImmutableSet.copyOf(componentSettings.getAsArray("groups"));
|
this.groups = ImmutableSet.copyOf(componentSettings.getAsArray("groups"));
|
||||||
|
@ -160,13 +161,19 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||||
address = instance.getPublicDnsName();
|
address = instance.getPublicDnsName();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
for (int port : new PortsRange(ports).ports()) {
|
if (address != null) {
|
||||||
if (address != null) {
|
try {
|
||||||
logger.trace("adding {}, address {}", instance.getInstanceId(), address);
|
TransportAddress[] addresses = transportService.addressesFromString(address);
|
||||||
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + port, new InetSocketTransportAddress(address, port)));
|
// we only limit to 1 addresses, makes no sense to ping 100 ports
|
||||||
} else {
|
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
|
||||||
logger.trace("not adding {}, address is null, host_type {}", instance.getInstanceId(), hostType);
|
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
|
||||||
|
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i]));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("failed ot add {}, address {}", e, instance.getInstanceId(), address);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logger.trace("not adding {}, address is null, host_type {}", instance.getInstanceId(), hostType);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class Ec2Discovery extends ZenDiscovery {
|
||||||
if (unicastZenPing != null) {
|
if (unicastZenPing != null) {
|
||||||
// update the unicast zen ping to add cloud hosts provider
|
// update the unicast zen ping to add cloud hosts provider
|
||||||
// and, while we are at it, use only it and not the multicast for example
|
// and, while we are at it, use only it and not the multicast for example
|
||||||
unicastZenPing.addHostsProvider(new AwsEc2UnicastHostsProvider(settings, ec2Service.client()));
|
unicastZenPing.addHostsProvider(new AwsEc2UnicastHostsProvider(settings, transportService, ec2Service.client()));
|
||||||
pingService.zenPings(ImmutableList.of(unicastZenPing));
|
pingService.zenPings(ImmutableList.of(unicastZenPing));
|
||||||
} else {
|
} else {
|
||||||
logger.warn("failed to apply ec2 unicast discovery, no unicast ping found");
|
logger.warn("failed to apply ec2 unicast discovery, no unicast ping found");
|
||||||
|
|
Loading…
Reference in New Issue