Filter instances using EC2 API instead of locally

This change will allow EC2 API to filter by tags, AZ, and instance state.  In the situation where you have a large number of instances/reservations, this can be a performance boost.

Note that we still do the security group filter locally due to the different strategies (all or some must match).

Closes #39.
This commit is contained in:
Joe Pollard 2013-10-13 21:11:26 -05:00 committed by David Pilato
parent 283d1748b5
commit 712baa42ae
1 changed files with 53 additions and 64 deletions

View File

@ -99,7 +99,12 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
DescribeInstancesResult descInstances;
try {
descInstances = client.describeInstances(new DescribeInstancesRequest());
// Query EC2 API based on AZ, instance state, and tag.
// NOTE: we don't filter by security group during the describe instances request for two reasons:
// 1. differences in VPCs require different parameters during query (ID vs Name)
// 2. We want to use two different strategies: (all security groups vs. any security groups)
descInstances = client.describeInstances(buildDescribeInstancesRequest());
} catch (AmazonClientException e) {
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
logger.debug("Full exception:", e);
@ -109,13 +114,6 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
logger.trace("building dynamic unicast discovery nodes...");
for (Reservation reservation : descInstances.getReservations()) {
for (Instance instance : reservation.getInstances()) {
if (!availabilityZones.isEmpty()) {
if (!availabilityZones.contains(instance.getPlacement().getAvailabilityZone())) {
logger.trace("filtering out instance {} based on availability_zone {}, not part of {}", instance.getInstanceId(), instance.getPlacement().getAvailabilityZone(), availabilityZones);
continue;
}
}
// lets see if we can filter based on groups
if (!groups.isEmpty()) {
List<GroupIdentifier> instanceSecurityGroups = instance.getSecurityGroups();
@ -138,66 +136,34 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
}
}
// see if we need to filter by tags
boolean filterByTag = false;
if (!tags.isEmpty()) {
if (instance.getTags() == null) {
filterByTag = true;
} else {
// check that all tags listed are there on the instance
for (Map.Entry<String, String> entry : tags.entrySet()) {
boolean found = false;
for (Tag tag : instance.getTags()) {
if (entry.getKey().equals(tag.getKey()) && entry.getValue().equals(tag.getValue())) {
found = true;
break;
}
}
if (!found) {
filterByTag = true;
break;
}
}
}
String address = null;
switch (hostType) {
case PRIVATE_DNS:
address = instance.getPrivateDnsName();
break;
case PRIVATE_IP:
address = instance.getPrivateIpAddress();
break;
case PUBLIC_DNS:
address = instance.getPublicDnsName();
break;
case PUBLIC_IP:
address = instance.getPublicDnsName();
break;
}
if (filterByTag) {
logger.trace("filtering out instance {} based tags {}, not part of {}", instance.getInstanceId(), tags, instance.getTags());
continue;
}
InstanceState state = instance.getState();
if (state.getName().equalsIgnoreCase("pending") || state.getName().equalsIgnoreCase("running")) {
String address = null;
switch (hostType) {
case PRIVATE_DNS:
address = instance.getPrivateDnsName();
break;
case PRIVATE_IP:
address = instance.getPrivateIpAddress();
break;
case PUBLIC_DNS:
address = instance.getPublicDnsName();
break;
case PUBLIC_IP:
address = instance.getPublicDnsName();
break;
}
if (address != null) {
try {
TransportAddress[] addresses = transportService.addressesFromString(address);
// we only limit to 1 address, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], Version.CURRENT));
}
} catch (Exception e) {
logger.warn("failed to add {}, address {}", e, instance.getInstanceId(), address);
if (address != null) {
try {
TransportAddress[] addresses = transportService.addressesFromString(address);
// we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], Version.CURRENT));
}
} else {
logger.trace("not adding {}, address is null, host_type {}", instance.getInstanceId(), hostType);
} catch (Exception e) {
logger.warn("failed ot add {}, address {}", e, instance.getInstanceId(), address);
}
} else {
logger.trace("not adding {}, state {} is not pending or running", instance.getInstanceId(), state.getName());
logger.trace("not adding {}, address is null, host_type {}", instance.getInstanceId(), hostType);
}
}
}
@ -206,4 +172,27 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
return discoNodes;
}
private DescribeInstancesRequest buildDescribeInstancesRequest() {
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest()
.withFilters(
new Filter("instance-state-name").withValues("running", "pending")
);
for (Map.Entry<String, String> tagFilter : tags.entrySet()) {
// for a given tag key, OR relationship for multiple different values
describeInstancesRequest.withFilters(
new Filter("tag:" + tagFilter.getKey()).withValues(tagFilter.getValue())
);
}
if (!availabilityZones.isEmpty()) {
// OR relationship amongst multiple values of the availability-zone filter
describeInstancesRequest.withFilters(
new Filter("availability-zone").withValues(availabilityZones)
);
}
return describeInstancesRequest;
}
}