mirror of https://github.com/apache/druid.git
EC2 autoscaler: avoid hitting aws filter limits
This commit is contained in:
parent
465cbcf9a7
commit
749ac12f88
|
@ -32,6 +32,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.indexing.overlord.autoscaling.AutoScaler;
|
||||
|
@ -45,6 +46,7 @@ import java.util.List;
|
|||
public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(EC2AutoScaler.class);
|
||||
public static final int MAX_AWS_FILTER_VALUES = 100;
|
||||
|
||||
private final int minNumWorkers;
|
||||
private final int maxNumWorkers;
|
||||
|
@ -244,29 +246,35 @@ public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
|
|||
@Override
|
||||
public List<String> ipToIdLookup(List<String> ips)
|
||||
{
|
||||
DescribeInstancesResult result = amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest()
|
||||
.withFilters(
|
||||
new Filter("private-ip-address", ips)
|
||||
)
|
||||
);
|
||||
|
||||
List<Instance> instances = Lists.newArrayList();
|
||||
for (Reservation reservation : result.getReservations()) {
|
||||
instances.addAll(reservation.getInstances());
|
||||
}
|
||||
|
||||
List<String> retVal = Lists.transform(
|
||||
instances,
|
||||
new Function<Instance, String>()
|
||||
final List<String> retVal = FluentIterable
|
||||
// chunk requests to avoid hitting default AWS limits on filters
|
||||
.from(Lists.partition(ips, MAX_AWS_FILTER_VALUES))
|
||||
.transformAndConcat(new Function<List<String>, Iterable<Reservation>>()
|
||||
{
|
||||
@Override
|
||||
public String apply(Instance input)
|
||||
public Iterable<Reservation> apply(List<String> input)
|
||||
{
|
||||
return input.getInstanceId();
|
||||
return amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest().withFilters(new Filter("private-ip-address", input))
|
||||
).getReservations();
|
||||
}
|
||||
}
|
||||
);
|
||||
})
|
||||
.transformAndConcat(new Function<Reservation, Iterable<Instance>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<Instance> apply(Reservation reservation)
|
||||
{
|
||||
return reservation.getInstances();
|
||||
}
|
||||
})
|
||||
.transform(new Function<Instance, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(Instance instance)
|
||||
{
|
||||
return instance.getInstanceId();
|
||||
}
|
||||
}).toList();
|
||||
|
||||
log.debug("Performing lookup: %s --> %s", ips, retVal);
|
||||
|
||||
|
@ -276,29 +284,35 @@ public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
|
|||
@Override
|
||||
public List<String> idToIpLookup(List<String> nodeIds)
|
||||
{
|
||||
DescribeInstancesResult result = amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest()
|
||||
.withFilters(
|
||||
new Filter("instance-id", nodeIds)
|
||||
)
|
||||
);
|
||||
|
||||
List<Instance> instances = Lists.newArrayList();
|
||||
for (Reservation reservation : result.getReservations()) {
|
||||
instances.addAll(reservation.getInstances());
|
||||
}
|
||||
|
||||
List<String> retVal = Lists.transform(
|
||||
instances,
|
||||
new Function<Instance, String>()
|
||||
final List<String> retVal = FluentIterable
|
||||
// chunk requests to avoid hitting default AWS limits on filters
|
||||
.from(Lists.partition(nodeIds, MAX_AWS_FILTER_VALUES))
|
||||
.transformAndConcat(new Function<List<String>, Iterable<Reservation>>()
|
||||
{
|
||||
@Override
|
||||
public String apply(Instance input)
|
||||
public Iterable<Reservation> apply(List<String> input)
|
||||
{
|
||||
return input.getPrivateIpAddress();
|
||||
return amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest().withFilters(new Filter("instance-id", input))
|
||||
).getReservations();
|
||||
}
|
||||
}
|
||||
);
|
||||
})
|
||||
.transformAndConcat(new Function<Reservation, Iterable<Instance>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<Instance> apply(Reservation reservation)
|
||||
{
|
||||
return reservation.getInstances();
|
||||
}
|
||||
})
|
||||
.transform(new Function<Instance, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(Instance instance)
|
||||
{
|
||||
return instance.getPrivateIpAddress();
|
||||
}
|
||||
}).toList();
|
||||
|
||||
log.debug("Performing lookup: %s --> %s", nodeIds, retVal);
|
||||
|
||||
|
|
|
@ -20,12 +20,17 @@ package io.druid.indexing.overlord.autoscaling;
|
|||
import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
|
||||
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
|
||||
import com.amazonaws.services.ec2.model.Filter;
|
||||
import com.amazonaws.services.ec2.model.Instance;
|
||||
import com.amazonaws.services.ec2.model.Reservation;
|
||||
import com.amazonaws.services.ec2.model.RunInstancesRequest;
|
||||
import com.amazonaws.services.ec2.model.RunInstancesResult;
|
||||
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.collect.ContiguousSet;
|
||||
import com.google.common.collect.DiscreteDomain;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Range;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.EC2EnvironmentConfig;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
|
||||
|
@ -38,7 +43,9 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -46,10 +53,14 @@ public class EC2AutoScalerTest
|
|||
{
|
||||
private static final String AMI_ID = "dummy";
|
||||
private static final String INSTANCE_ID = "theInstance";
|
||||
public static final EC2EnvironmentConfig ENV_CONFIG = new EC2EnvironmentConfig(
|
||||
"us-east-1a",
|
||||
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo", "mySubnet", null, null),
|
||||
new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type")
|
||||
);
|
||||
private static final String IP = "dummyIP";
|
||||
|
||||
private AmazonEC2Client amazonEC2Client;
|
||||
private RunInstancesResult runInstancesResult;
|
||||
private DescribeInstancesResult describeInstancesResult;
|
||||
private Reservation reservation;
|
||||
private Instance instance;
|
||||
|
@ -59,7 +70,6 @@ public class EC2AutoScalerTest
|
|||
public void setUp() throws Exception
|
||||
{
|
||||
amazonEC2Client = EasyMock.createMock(AmazonEC2Client.class);
|
||||
runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
|
||||
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
|
||||
reservation = EasyMock.createMock(Reservation.class);
|
||||
|
||||
|
@ -76,7 +86,6 @@ public class EC2AutoScalerTest
|
|||
public void tearDown() throws Exception
|
||||
{
|
||||
EasyMock.verify(amazonEC2Client);
|
||||
EasyMock.verify(runInstancesResult);
|
||||
EasyMock.verify(describeInstancesResult);
|
||||
EasyMock.verify(reservation);
|
||||
}
|
||||
|
@ -84,14 +93,12 @@ public class EC2AutoScalerTest
|
|||
@Test
|
||||
public void testScale()
|
||||
{
|
||||
RunInstancesResult runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
|
||||
|
||||
EC2AutoScaler autoScaler = new EC2AutoScaler(
|
||||
0,
|
||||
1,
|
||||
new EC2EnvironmentConfig(
|
||||
"us-east-1a",
|
||||
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo", "mySubnet", null, null),
|
||||
new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type")
|
||||
),
|
||||
ENV_CONFIG,
|
||||
amazonEC2Client,
|
||||
managementConfig
|
||||
);
|
||||
|
@ -108,10 +115,10 @@ public class EC2AutoScalerTest
|
|||
EasyMock.expect(runInstancesResult.getReservation()).andReturn(reservation).atLeastOnce();
|
||||
EasyMock.replay(runInstancesResult);
|
||||
|
||||
EasyMock.expect(describeInstancesResult.getReservations()).andReturn(Arrays.asList(reservation)).atLeastOnce();
|
||||
EasyMock.expect(describeInstancesResult.getReservations()).andReturn(Collections.singletonList(reservation)).atLeastOnce();
|
||||
EasyMock.replay(describeInstancesResult);
|
||||
|
||||
EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce();
|
||||
EasyMock.expect(reservation.getInstances()).andReturn(Collections.singletonList(instance)).atLeastOnce();
|
||||
EasyMock.replay(reservation);
|
||||
|
||||
AutoScalingData created = autoScaler.provision();
|
||||
|
@ -123,5 +130,121 @@ public class EC2AutoScalerTest
|
|||
|
||||
Assert.assertEquals(deleted.getNodeIds().size(), 1);
|
||||
Assert.assertEquals(INSTANCE_ID, deleted.getNodeIds().get(0));
|
||||
|
||||
EasyMock.verify(runInstancesResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIptoIdLookup() throws Exception
|
||||
{
|
||||
EC2AutoScaler autoScaler = new EC2AutoScaler(
|
||||
0,
|
||||
1,
|
||||
ENV_CONFIG,
|
||||
amazonEC2Client,
|
||||
managementConfig
|
||||
);
|
||||
|
||||
final int n = 150;
|
||||
Assert.assertTrue(n <= 2 * EC2AutoScaler.MAX_AWS_FILTER_VALUES);
|
||||
|
||||
List<String> ips = Lists.transform(
|
||||
ContiguousSet.create(Range.closedOpen(0, n), DiscreteDomain.integers()).asList(),
|
||||
Functions.toStringFunction()
|
||||
);
|
||||
|
||||
EasyMock.expect(amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest().withFilters(new Filter(
|
||||
"private-ip-address",
|
||||
ips.subList(0, EC2AutoScaler.MAX_AWS_FILTER_VALUES)
|
||||
))
|
||||
))
|
||||
.andReturn(describeInstancesResult);
|
||||
|
||||
EasyMock.expect(amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest().withFilters(new Filter(
|
||||
"private-ip-address",
|
||||
ips.subList(EC2AutoScaler.MAX_AWS_FILTER_VALUES, n)
|
||||
))
|
||||
))
|
||||
.andReturn(describeInstancesResult);
|
||||
|
||||
EasyMock.replay(amazonEC2Client);
|
||||
|
||||
final Reservation[] chunk1 = new Reservation[EC2AutoScaler.MAX_AWS_FILTER_VALUES];
|
||||
Arrays.fill(chunk1, reservation);
|
||||
final Reservation[] chunk2 = new Reservation[n - EC2AutoScaler.MAX_AWS_FILTER_VALUES];
|
||||
Arrays.fill(chunk2, reservation);
|
||||
EasyMock.expect(describeInstancesResult.getReservations()).andReturn(
|
||||
Lists.newArrayList(chunk1)
|
||||
);
|
||||
EasyMock.expect(describeInstancesResult.getReservations()).andReturn(
|
||||
Lists.newArrayList(chunk2)
|
||||
);
|
||||
EasyMock.replay(describeInstancesResult);
|
||||
|
||||
EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).times(n);
|
||||
EasyMock.replay(reservation);
|
||||
|
||||
List<String> ids = autoScaler.ipToIdLookup(ips);
|
||||
|
||||
Assert.assertEquals(n, ids.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdToIpLookup() throws Exception
|
||||
{
|
||||
EC2AutoScaler autoScaler = new EC2AutoScaler(
|
||||
0,
|
||||
1,
|
||||
ENV_CONFIG,
|
||||
amazonEC2Client,
|
||||
managementConfig
|
||||
);
|
||||
|
||||
final int n = 150;
|
||||
Assert.assertTrue(n <= 2 * EC2AutoScaler.MAX_AWS_FILTER_VALUES);
|
||||
|
||||
List<String> ids = Lists.transform(
|
||||
ContiguousSet.create(Range.closedOpen(0, n), DiscreteDomain.integers()).asList(),
|
||||
Functions.toStringFunction()
|
||||
);
|
||||
|
||||
EasyMock.expect(amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest().withFilters(new Filter(
|
||||
"instance-id",
|
||||
ids.subList(0, EC2AutoScaler.MAX_AWS_FILTER_VALUES)
|
||||
))
|
||||
))
|
||||
.andReturn(describeInstancesResult);
|
||||
|
||||
EasyMock.expect(amazonEC2Client.describeInstances(
|
||||
new DescribeInstancesRequest().withFilters(new Filter(
|
||||
"instance-id",
|
||||
ids.subList(EC2AutoScaler.MAX_AWS_FILTER_VALUES, n)
|
||||
))
|
||||
))
|
||||
.andReturn(describeInstancesResult);
|
||||
|
||||
EasyMock.replay(amazonEC2Client);
|
||||
|
||||
final Reservation[] chunk1 = new Reservation[EC2AutoScaler.MAX_AWS_FILTER_VALUES];
|
||||
Arrays.fill(chunk1, reservation);
|
||||
final Reservation[] chunk2 = new Reservation[n - EC2AutoScaler.MAX_AWS_FILTER_VALUES];
|
||||
Arrays.fill(chunk2, reservation);
|
||||
EasyMock.expect(describeInstancesResult.getReservations()).andReturn(
|
||||
Lists.newArrayList(chunk1)
|
||||
);
|
||||
EasyMock.expect(describeInstancesResult.getReservations()).andReturn(
|
||||
Lists.newArrayList(chunk2)
|
||||
);
|
||||
EasyMock.replay(describeInstancesResult);
|
||||
|
||||
EasyMock.expect(reservation.getInstances()).andReturn(Collections.singletonList(instance)).times(n);
|
||||
EasyMock.replay(reservation);
|
||||
|
||||
List<String> ips = autoScaler.idToIpLookup(ids);
|
||||
|
||||
Assert.assertEquals(n, ips.size());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue