diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java index f0965b6c098..ec02a4850f9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java @@ -32,6 +32,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import com.metamx.emitter.EmittingLogger; import io.druid.indexing.overlord.autoscaling.AutoScaler; @@ -45,6 +46,7 @@ import java.util.List; public class EC2AutoScaler implements AutoScaler { private static final EmittingLogger log = new EmittingLogger(EC2AutoScaler.class); + public static final int MAX_AWS_FILTER_VALUES = 100; private final int minNumWorkers; private final int maxNumWorkers; @@ -244,29 +246,35 @@ public class EC2AutoScaler implements AutoScaler @Override public List ipToIdLookup(List ips) { - DescribeInstancesResult result = amazonEC2Client.describeInstances( - new DescribeInstancesRequest() - .withFilters( - new Filter("private-ip-address", ips) - ) - ); - - List instances = Lists.newArrayList(); - for (Reservation reservation : result.getReservations()) { - instances.addAll(reservation.getInstances()); - } - - List retVal = Lists.transform( - instances, - new Function() + final List retVal = FluentIterable + // chunk requests to avoid hitting default AWS limits on filters + .from(Lists.partition(ips, MAX_AWS_FILTER_VALUES)) + .transformAndConcat(new Function, Iterable>() { @Override - public String apply(Instance input) + public Iterable apply(List input) { - return input.getInstanceId(); + return amazonEC2Client.describeInstances( + new DescribeInstancesRequest().withFilters(new Filter("private-ip-address", input)) + ).getReservations(); } - } - ); + }) + .transformAndConcat(new Function>() + { + @Override + public Iterable apply(Reservation reservation) + { + return reservation.getInstances(); + } + }) + .transform(new Function() + { + @Override + public String apply(Instance instance) + { + return instance.getInstanceId(); + } + }).toList(); log.debug("Performing lookup: %s --> %s", ips, retVal); @@ -276,29 +284,35 @@ public class EC2AutoScaler implements AutoScaler @Override public List idToIpLookup(List nodeIds) { - DescribeInstancesResult result = amazonEC2Client.describeInstances( - new DescribeInstancesRequest() - .withFilters( - new Filter("instance-id", nodeIds) - ) - ); - - List instances = Lists.newArrayList(); - for (Reservation reservation : result.getReservations()) { - instances.addAll(reservation.getInstances()); - } - - List retVal = Lists.transform( - instances, - new Function() + final List retVal = FluentIterable + // chunk requests to avoid hitting default AWS limits on filters + .from(Lists.partition(nodeIds, MAX_AWS_FILTER_VALUES)) + .transformAndConcat(new Function, Iterable>() { @Override - public String apply(Instance input) + public Iterable apply(List input) { - return input.getPrivateIpAddress(); + return amazonEC2Client.describeInstances( + new DescribeInstancesRequest().withFilters(new Filter("instance-id", input)) + ).getReservations(); } - } - ); + }) + .transformAndConcat(new Function>() + { + @Override + public Iterable apply(Reservation reservation) + { + return reservation.getInstances(); + } + }) + .transform(new Function() + { + @Override + public String apply(Instance instance) + { + return instance.getPrivateIpAddress(); + } + }).toList(); log.debug("Performing lookup: %s --> %s", nodeIds, retVal); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java index 7e33c85cf00..2bcfeb03b2d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java @@ -20,12 +20,17 @@ package io.druid.indexing.overlord.autoscaling; import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; +import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.Instance; import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; +import com.google.common.base.Functions; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; import com.google.common.collect.Lists; +import com.google.common.collect.Range; import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler; import io.druid.indexing.overlord.autoscaling.ec2.EC2EnvironmentConfig; import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData; @@ -38,7 +43,9 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.Date; +import java.util.List; /** */ @@ -46,10 +53,14 @@ public class EC2AutoScalerTest { private static final String AMI_ID = "dummy"; private static final String INSTANCE_ID = "theInstance"; + public static final EC2EnvironmentConfig ENV_CONFIG = new EC2EnvironmentConfig( + "us-east-1a", + new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.newArrayList(), "foo", "mySubnet", null, null), + new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type") + ); private static final String IP = "dummyIP"; private AmazonEC2Client amazonEC2Client; - private RunInstancesResult runInstancesResult; private DescribeInstancesResult describeInstancesResult; private Reservation reservation; private Instance instance; @@ -59,7 +70,6 @@ public class EC2AutoScalerTest public void setUp() throws Exception { amazonEC2Client = EasyMock.createMock(AmazonEC2Client.class); - runInstancesResult = EasyMock.createMock(RunInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); reservation = EasyMock.createMock(Reservation.class); @@ -76,7 +86,6 @@ public class EC2AutoScalerTest public void tearDown() throws Exception { EasyMock.verify(amazonEC2Client); - EasyMock.verify(runInstancesResult); EasyMock.verify(describeInstancesResult); EasyMock.verify(reservation); } @@ -84,14 +93,12 @@ public class EC2AutoScalerTest @Test public void testScale() { + RunInstancesResult runInstancesResult = EasyMock.createMock(RunInstancesResult.class); + EC2AutoScaler autoScaler = new EC2AutoScaler( 0, 1, - new EC2EnvironmentConfig( - "us-east-1a", - new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.newArrayList(), "foo", "mySubnet", null, null), - new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type") - ), + ENV_CONFIG, amazonEC2Client, managementConfig ); @@ -108,10 +115,10 @@ public class EC2AutoScalerTest EasyMock.expect(runInstancesResult.getReservation()).andReturn(reservation).atLeastOnce(); EasyMock.replay(runInstancesResult); - EasyMock.expect(describeInstancesResult.getReservations()).andReturn(Arrays.asList(reservation)).atLeastOnce(); + EasyMock.expect(describeInstancesResult.getReservations()).andReturn(Collections.singletonList(reservation)).atLeastOnce(); EasyMock.replay(describeInstancesResult); - EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce(); + EasyMock.expect(reservation.getInstances()).andReturn(Collections.singletonList(instance)).atLeastOnce(); EasyMock.replay(reservation); AutoScalingData created = autoScaler.provision(); @@ -123,5 +130,121 @@ public class EC2AutoScalerTest Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(INSTANCE_ID, deleted.getNodeIds().get(0)); + + EasyMock.verify(runInstancesResult); + } + + @Test + public void testIptoIdLookup() throws Exception + { + EC2AutoScaler autoScaler = new EC2AutoScaler( + 0, + 1, + ENV_CONFIG, + amazonEC2Client, + managementConfig + ); + + final int n = 150; + Assert.assertTrue(n <= 2 * EC2AutoScaler.MAX_AWS_FILTER_VALUES); + + List ips = Lists.transform( + ContiguousSet.create(Range.closedOpen(0, n), DiscreteDomain.integers()).asList(), + Functions.toStringFunction() + ); + + EasyMock.expect(amazonEC2Client.describeInstances( + new DescribeInstancesRequest().withFilters(new Filter( + "private-ip-address", + ips.subList(0, EC2AutoScaler.MAX_AWS_FILTER_VALUES) + )) + )) + .andReturn(describeInstancesResult); + + EasyMock.expect(amazonEC2Client.describeInstances( + new DescribeInstancesRequest().withFilters(new Filter( + "private-ip-address", + ips.subList(EC2AutoScaler.MAX_AWS_FILTER_VALUES, n) + )) + )) + .andReturn(describeInstancesResult); + + EasyMock.replay(amazonEC2Client); + + final Reservation[] chunk1 = new Reservation[EC2AutoScaler.MAX_AWS_FILTER_VALUES]; + Arrays.fill(chunk1, reservation); + final Reservation[] chunk2 = new Reservation[n - EC2AutoScaler.MAX_AWS_FILTER_VALUES]; + Arrays.fill(chunk2, reservation); + EasyMock.expect(describeInstancesResult.getReservations()).andReturn( + Lists.newArrayList(chunk1) + ); + EasyMock.expect(describeInstancesResult.getReservations()).andReturn( + Lists.newArrayList(chunk2) + ); + EasyMock.replay(describeInstancesResult); + + EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).times(n); + EasyMock.replay(reservation); + + List ids = autoScaler.ipToIdLookup(ips); + + Assert.assertEquals(n, ids.size()); + } + + @Test + public void testIdToIpLookup() throws Exception + { + EC2AutoScaler autoScaler = new EC2AutoScaler( + 0, + 1, + ENV_CONFIG, + amazonEC2Client, + managementConfig + ); + + final int n = 150; + Assert.assertTrue(n <= 2 * EC2AutoScaler.MAX_AWS_FILTER_VALUES); + + List ids = Lists.transform( + ContiguousSet.create(Range.closedOpen(0, n), DiscreteDomain.integers()).asList(), + Functions.toStringFunction() + ); + + EasyMock.expect(amazonEC2Client.describeInstances( + new DescribeInstancesRequest().withFilters(new Filter( + "instance-id", + ids.subList(0, EC2AutoScaler.MAX_AWS_FILTER_VALUES) + )) + )) + .andReturn(describeInstancesResult); + + EasyMock.expect(amazonEC2Client.describeInstances( + new DescribeInstancesRequest().withFilters(new Filter( + "instance-id", + ids.subList(EC2AutoScaler.MAX_AWS_FILTER_VALUES, n) + )) + )) + .andReturn(describeInstancesResult); + + EasyMock.replay(amazonEC2Client); + + final Reservation[] chunk1 = new Reservation[EC2AutoScaler.MAX_AWS_FILTER_VALUES]; + Arrays.fill(chunk1, reservation); + final Reservation[] chunk2 = new Reservation[n - EC2AutoScaler.MAX_AWS_FILTER_VALUES]; + Arrays.fill(chunk2, reservation); + EasyMock.expect(describeInstancesResult.getReservations()).andReturn( + Lists.newArrayList(chunk1) + ); + EasyMock.expect(describeInstancesResult.getReservations()).andReturn( + Lists.newArrayList(chunk2) + ); + EasyMock.replay(describeInstancesResult); + + EasyMock.expect(reservation.getInstances()).andReturn(Collections.singletonList(instance)).times(n); + EasyMock.replay(reservation); + + List ips = autoScaler.idToIpLookup(ids); + + Assert.assertEquals(n, ips.size()); } }