bug fixes with ec2 auto scaling

This commit is contained in:
Fangjin Yang 2013-01-09 14:51:35 -08:00
parent 3685839376
commit 4c2da93389
13 changed files with 146 additions and 45 deletions

View File

@ -65,7 +65,7 @@ public class DbConnector
dbi,
workerTableName,
String.format(
"CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL)",
"CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL, securityGroupIds LONGTEXT NOT NULL, keyName TINYTEXT NOT NULL)",
workerTableName
)
);

View File

@ -54,6 +54,7 @@ import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -148,7 +149,7 @@ public class RemoteTaskRunner implements TaskRunner
Worker.class
);
log.info("Worker[%s] removed!", worker.getHost());
removeWorker(worker.getHost());
removeWorker(worker);
}
}
}
@ -222,7 +223,7 @@ public class RemoteTaskRunner implements TaskRunner
}
log.info(
"[%s] still terminating. Wait for all nodes to terminate before trying again.",
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
}
@ -372,7 +373,7 @@ public class RemoteTaskRunner implements TaskRunner
private void addWorker(final Worker worker)
{
try {
currentlyProvisioning.remove(worker.getHost());
currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.<String>asList(worker.getIp())));
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
@ -483,22 +484,22 @@ public class RemoteTaskRunner implements TaskRunner
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
* to the worker. If tasks remain, they are retried.
*
* @param workerId - id of the removed worker
* @param worker - the removed worker
*/
private void removeWorker(final String workerId)
private void removeWorker(final Worker worker)
{
currentlyTerminating.remove(workerId);
currentlyTerminating.remove(worker.getHost());
WorkerWrapper workerWrapper = zkWorkers.get(workerId);
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) {
try {
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId)));
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost())));
for (String taskId : tasksToRetry) {
TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper != null) {
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
}
}
@ -508,7 +509,7 @@ public class RemoteTaskRunner implements TaskRunner
log.error(e, "Failed to cleanly remove worker[%s]");
}
}
zkWorkers.remove(workerId);
zkWorkers.remove(worker.getHost());
}
private WorkerWrapper findWorkerForTask()
@ -558,7 +559,7 @@ public class RemoteTaskRunner implements TaskRunner
}
log.info(
"[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
currentlyProvisioning
);
}

View File

@ -47,6 +47,9 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -70,9 +73,6 @@ import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;

View File

@ -36,6 +36,7 @@ import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.codec.binary.Base64;
import org.codehaus.jackson.map.ObjectMapper;
import java.util.List;
@ -82,7 +83,15 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
workerConfig.getMaxInstances()
)
.withInstanceType(workerConfig.getInstanceType())
.withUserData(jsonMapper.writeValueAsString(setupData.getUserData()))
.withSecurityGroupIds(workerConfig.getSecurityGroupIds())
.withKeyName(workerConfig.getKeyName())
.withUserData(
Base64.encodeBase64String(
jsonMapper.writeValueAsBytes(
setupData.getUserData()
)
)
)
);
List<String> instanceIds = Lists.transform(
@ -107,7 +116,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
@Override
public String apply(Instance input)
{
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort());
return input.getInstanceId();
}
}
),
@ -127,7 +136,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("private-ip-address", nodeIds)
new Filter("instance-id", nodeIds)
)
);
@ -139,19 +148,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
try {
log.info("Terminating instance[%s]", instances);
amazonEC2Client.terminateInstances(
new TerminateInstancesRequest(
Lists.transform(
instances,
new Function<Instance, String>()
{
@Override
public String apply(Instance input)
{
return input.getInstanceId();
}
}
)
)
new TerminateInstancesRequest(nodeIds)
);
return new AutoScalingData<Instance>(
@ -175,4 +172,32 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
return null;
}
@Override
public List<String> ipLookup(List<String> ips)
{
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("private-ip-address", ips)
)
);
List<Instance> instances = Lists.newArrayList();
for (Reservation reservation : result.getReservations()) {
instances.addAll(reservation.getInstances());
}
return Lists.transform(
instances,
new Function<Instance, String>()
{
@Override
public String apply(Instance input)
{
return input.getInstanceId();
}
}
);
}
}

View File

@ -43,4 +43,11 @@ public class NoopScalingStrategy implements ScalingStrategy<String>
log.info("If I were a real strategy I'd terminate %s now", nodeIds);
return null;
}
@Override
public List<String> ipLookup(List<String> ips)
{
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
return ips;
}
}

View File

@ -28,4 +28,11 @@ public interface ScalingStrategy<T>
public AutoScalingData<T> provision();
public AutoScalingData<T> terminate(List<String> nodeIds);
/**
* Provides a lookup of ip addresses to node ids
* @param ips
* @return
*/
public List<String> ipLookup(List<String> ips);
}

View File

@ -22,6 +22,8 @@ package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.List;
/**
*/
public class EC2NodeData implements WorkerNodeData
@ -30,19 +32,25 @@ public class EC2NodeData implements WorkerNodeData
private final String instanceType;
private final int minInstances;
private final int maxInstances;
private final List<String> securityGroupIds;
private final String keyName;
@JsonCreator
public EC2NodeData(
@JsonProperty("amiId") String amiId,
@JsonProperty("instanceType") String instanceType,
@JsonProperty("minInstances") int minInstances,
@JsonProperty("maxInstances") int maxInstances
@JsonProperty("maxInstances") int maxInstances,
@JsonProperty("securityGroupIds") List<String> securityGroupIds,
@JsonProperty("keyName") String keyName
)
{
this.amiId = amiId;
this.instanceType = instanceType;
this.minInstances = minInstances;
this.maxInstances = maxInstances;
this.securityGroupIds = securityGroupIds;
this.keyName = keyName;
}
@JsonProperty
@ -68,4 +76,16 @@ public class EC2NodeData implements WorkerNodeData
{
return maxInstances;
}
@JsonProperty
public List<String> getSecurityGroupIds()
{
return securityGroupIds;
}
@JsonProperty
public String getKeyName()
{
return keyName;
}
}

View File

@ -27,18 +27,18 @@ import org.codehaus.jackson.annotate.JsonProperty;
public class GalaxyUserData implements WorkerUserData
{
public final String env;
public final String ver;
public final String version;
public final String type;
@JsonCreator
public GalaxyUserData(
@JsonProperty("env") String env,
@JsonProperty("ver") String ver,
@JsonProperty("version") String version,
@JsonProperty("type") String type
)
{
this.env = env;
this.ver = ver;
this.version = version;
this.type = type;
}
@ -49,9 +49,9 @@ public class GalaxyUserData implements WorkerUserData
}
@JsonProperty
public String getVer()
public String getVersion()
{
return ver;
return version;
}
@JsonProperty

View File

@ -22,6 +22,8 @@ package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.List;
/**
*/
public class WorkerSetupData
@ -30,19 +32,25 @@ public class WorkerSetupData
private final int minNumWorkers;
private final WorkerNodeData nodeData;
private final WorkerUserData userData;
private final List<String> securityGroupIds;
private final String keyName;
@JsonCreator
public WorkerSetupData(
@JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("nodeData") WorkerNodeData nodeData,
@JsonProperty("userData") WorkerUserData userData
@JsonProperty("userData") WorkerUserData userData,
@JsonProperty("securityGroupIds") List<String> securityGroupIds,
@JsonProperty("keyName") String keyName
)
{
this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers;
this.nodeData = nodeData;
this.userData = userData;
this.securityGroupIds = securityGroupIds;
this.keyName = keyName;
}
@JsonProperty
@ -68,4 +76,16 @@ public class WorkerSetupData
{
return userData;
}
@JsonProperty
public List<String> getSecurityGroupIds()
{
return securityGroupIds;
}
@JsonProperty
public String getKeyName()
{
return keyName;
}
}

View File

@ -29,6 +29,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import org.apache.commons.collections.MapUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.Duration;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
@ -122,7 +123,7 @@ public class WorkerSetupManager
{
return handle.createQuery(
String.format(
"SELECT minVersion, minNumWorkers, nodeData, userData FROM %s",
"SELECT minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName FROM %s",
config.getWorkerSetupTable()
)
).fold(
@ -150,7 +151,14 @@ public class WorkerSetupManager
jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "userData"),
WorkerUserData.class
)
),
(List<String>) jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "securityGroupIds"),
new TypeReference<List<String>>()
{
}
),
MapUtils.getString(stringObjectMap, "keyName")
)
);
return workerNodeConfigurations;
@ -207,7 +215,7 @@ public class WorkerSetupManager
handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute();
handle.createStatement(
String.format(
"INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData)",
"INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData, :securityGroupIds, :keyName)",
config.getWorkerSetupTable()
)
)
@ -215,6 +223,8 @@ public class WorkerSetupManager
.bind("minNumWorkers", value.getMinNumWorkers())
.bind("nodeData", jsonMapper.writeValueAsString(value.getNodeData()))
.bind("userData", jsonMapper.writeValueAsString(value.getUserData()))
.bind("securityGroupIds", jsonMapper.writeValueAsString(value.getSecurityGroupIds()))
.bind("keyName", jsonMapper.writeValueAsString(value.getKeyName()))
.execute();
return null;

View File

@ -343,6 +343,8 @@ public class RemoteTaskRunnerTest
"0",
0,
null,
null,
Lists.<String>newArrayList(),
null
)
);
@ -404,6 +406,12 @@ public class RemoteTaskRunnerTest
{
return null;
}
@Override
public List<String> ipLookup(List<String> ips)
{
return ips;
}
}
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig

View File

@ -27,6 +27,7 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
@ -105,8 +106,10 @@ public class EC2AutoScalingStrategyTest
new WorkerSetupData(
"0",
0,
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1),
new GalaxyUserData("env", "ver", "type")
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
new GalaxyUserData("env", "version", "type"),
Arrays.asList("foo"),
"foo2"
)
);
EasyMock.replay(workerSetupManager);
@ -133,7 +136,7 @@ public class EC2AutoScalingStrategyTest
Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0));
Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost"));

View File

@ -84,7 +84,7 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
<version>1.7</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>