updates to autoscaling config based on code review comments

This commit is contained in:
Fangjin Yang 2013-01-14 14:55:04 -08:00
parent 4c2da93389
commit d1f4317af7
14 changed files with 50 additions and 145 deletions

View File

@ -65,7 +65,7 @@ public class DbConnector
dbi, dbi,
workerTableName, workerTableName,
String.format( String.format(
"CREATE table %s (minVersion TINYTEXT NOT NULL, minNumWorkers SMALLINT NOT NULL, nodeData LONGTEXT NOT NULL, userData LONGTEXT NOT NULL, securityGroupIds LONGTEXT NOT NULL, keyName TINYTEXT NOT NULL)", "CREATE table %s (config LONGTEXT NOT NULL)",
workerTableName workerTableName
) )
); );

View File

@ -53,7 +53,6 @@ import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -185,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner
new Predicate<WorkerWrapper>() new Predicate<WorkerWrapper>()
{ {
@Override @Override
public boolean apply(@Nullable WorkerWrapper input) public boolean apply(WorkerWrapper input)
{ {
return input.getRunningTasks().isEmpty() return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
@ -201,9 +200,9 @@ public class RemoteTaskRunner implements TaskRunner
new Function<WorkerWrapper, String>() new Function<WorkerWrapper, String>()
{ {
@Override @Override
public String apply(@Nullable WorkerWrapper input) public String apply(WorkerWrapper input)
{ {
return input.getWorker().getHost(); return input.getWorker().getIp();
} }
} }
) )

View File

@ -38,7 +38,7 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
public abstract DateTime getTerminateResourcesOriginDateTime(); public abstract DateTime getTerminateResourcesOriginDateTime();
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
@Default("1") @Default("10000")
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
@Config("druid.indexer.maxScalingDuration") @Config("druid.indexer.maxScalingDuration")

View File

@ -30,7 +30,6 @@ import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData; import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
@ -39,6 +38,7 @@ import com.metamx.emitter.EmittingLogger;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
/** /**
@ -70,10 +70,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
{ {
try { try {
WorkerSetupData setupData = workerSetupManager.getWorkerSetupData(); WorkerSetupData setupData = workerSetupManager.getWorkerSetupData();
if (!(setupData.getNodeData() instanceof EC2NodeData)) { EC2NodeData workerConfig = setupData.getNodeData();
throw new ISE("DB misconfiguration! Node data is an instance of [%s]", setupData.getNodeData().getClass());
}
EC2NodeData workerConfig = (EC2NodeData) setupData.getNodeData();
log.info("Creating new instance(s)..."); log.info("Creating new instance(s)...");
RunInstancesResult result = amazonEC2Client.runInstances( RunInstancesResult result = amazonEC2Client.runInstances(
@ -131,12 +128,12 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
} }
@Override @Override
public AutoScalingData<Instance> terminate(List<String> nodeIds) public AutoScalingData<Instance> terminate(List<String> ids)
{ {
DescribeInstancesResult result = amazonEC2Client.describeInstances( DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest() new DescribeInstancesRequest()
.withFilters( .withFilters(
new Filter("instance-id", nodeIds) new Filter("private-ip-address", ids)
) )
); );
@ -148,10 +145,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
try { try {
log.info("Terminating instance[%s]", instances); log.info("Terminating instance[%s]", instances);
amazonEC2Client.terminateInstances( amazonEC2Client.terminateInstances(
new TerminateInstancesRequest(nodeIds) new TerminateInstancesRequest(
);
return new AutoScalingData<Instance>(
Lists.transform( Lists.transform(
instances, instances,
new Function<Instance, String>() new Function<Instance, String>()
@ -159,7 +153,22 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
@Override @Override
public String apply(Instance input) public String apply(Instance input)
{ {
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort()); return input.getInstanceId();
}
}
)
)
);
return new AutoScalingData<Instance>(
Lists.transform(
ids,
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return String.format("%s:%s", input, config.getWorkerPort());
} }
} }
), ),
@ -188,7 +197,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
instances.addAll(reservation.getInstances()); instances.addAll(reservation.getInstances());
} }
return Lists.transform( List<String> retVal = Lists.transform(
instances, instances,
new Function<Instance, String>() new Function<Instance, String>()
{ {
@ -199,5 +208,9 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
} }
} }
); );
log.info("Performing lookup: %s --> %s", ips, retVal);
return retVal;
} }
} }

View File

@ -27,7 +27,7 @@ public interface ScalingStrategy<T>
{ {
public AutoScalingData<T> provision(); public AutoScalingData<T> provision();
public AutoScalingData<T> terminate(List<String> nodeIds); public AutoScalingData<T> terminate(List<String> ids);
/** /**
* Provides a lookup of ip addresses to node ids * Provides a lookup of ip addresses to node ids

View File

@ -26,7 +26,7 @@ import java.util.List;
/** /**
*/ */
public class EC2NodeData implements WorkerNodeData public class EC2NodeData
{ {
private final String amiId; private final String amiId;
private final String instanceType; private final String instanceType;

View File

@ -24,7 +24,7 @@ import org.codehaus.jackson.annotate.JsonProperty;
/** /**
*/ */
public class GalaxyUserData implements WorkerUserData public class GalaxyUserData
{ {
public final String env; public final String env;
public final String version; public final String version;

View File

@ -1,33 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
/**
*/
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="type")
@JsonSubTypes(value={
@JsonSubTypes.Type(name="ec2", value=EC2NodeData.class)
})
public interface WorkerNodeData
{
}

View File

@ -30,27 +30,21 @@ public class WorkerSetupData
{ {
private final String minVersion; private final String minVersion;
private final int minNumWorkers; private final int minNumWorkers;
private final WorkerNodeData nodeData; private final EC2NodeData nodeData;
private final WorkerUserData userData; private final GalaxyUserData userData;
private final List<String> securityGroupIds;
private final String keyName;
@JsonCreator @JsonCreator
public WorkerSetupData( public WorkerSetupData(
@JsonProperty("minVersion") String minVersion, @JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers, @JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("nodeData") WorkerNodeData nodeData, @JsonProperty("nodeData") EC2NodeData nodeData,
@JsonProperty("userData") WorkerUserData userData, @JsonProperty("userData") GalaxyUserData userData
@JsonProperty("securityGroupIds") List<String> securityGroupIds,
@JsonProperty("keyName") String keyName
) )
{ {
this.minVersion = minVersion; this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers; this.minNumWorkers = minNumWorkers;
this.nodeData = nodeData; this.nodeData = nodeData;
this.userData = userData; this.userData = userData;
this.securityGroupIds = securityGroupIds;
this.keyName = keyName;
} }
@JsonProperty @JsonProperty
@ -66,26 +60,14 @@ public class WorkerSetupData
} }
@JsonProperty @JsonProperty
public WorkerNodeData getNodeData() public EC2NodeData getNodeData()
{ {
return nodeData; return nodeData;
} }
@JsonProperty @JsonProperty
public WorkerUserData getUserData() public GalaxyUserData getUserData()
{ {
return userData; return userData;
} }
@JsonProperty
public List<String> getSecurityGroupIds()
{
return securityGroupIds;
}
@JsonProperty
public String getKeyName()
{
return keyName;
}
} }

View File

@ -123,7 +123,7 @@ public class WorkerSetupManager
{ {
return handle.createQuery( return handle.createQuery(
String.format( String.format(
"SELECT minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName FROM %s", "SELECT config FROM %s",
config.getWorkerSetupTable() config.getWorkerSetupTable()
) )
).fold( ).fold(
@ -141,24 +141,9 @@ public class WorkerSetupManager
try { try {
// stringObjectMap lowercases and jackson may fail serde // stringObjectMap lowercases and jackson may fail serde
workerNodeConfigurations.add( workerNodeConfigurations.add(
new WorkerSetupData(
MapUtils.getString(stringObjectMap, "minVersion"),
MapUtils.getInteger(stringObjectMap, "minNumWorkers"),
jsonMapper.readValue( jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "nodeData"), MapUtils.getString(stringObjectMap, "config"),
WorkerNodeData.class WorkerSetupData.class
),
jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "userData"),
WorkerUserData.class
),
(List<String>) jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "securityGroupIds"),
new TypeReference<List<String>>()
{
}
),
MapUtils.getString(stringObjectMap, "keyName")
) )
); );
return workerNodeConfigurations; return workerNodeConfigurations;
@ -215,16 +200,11 @@ public class WorkerSetupManager
handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute(); handle.createStatement(String.format("DELETE FROM %s", config.getWorkerSetupTable())).execute();
handle.createStatement( handle.createStatement(
String.format( String.format(
"INSERT INTO %s (minVersion, minNumWorkers, nodeData, userData, securityGroupIds, keyName) VALUES (:minVersion, :minNumWorkers, :nodeData, :userData, :securityGroupIds, :keyName)", "INSERT INTO %s (config) VALUES (:config)",
config.getWorkerSetupTable() config.getWorkerSetupTable()
) )
) )
.bind("minVersion", value.getMinVersion()) .bind("config", jsonMapper.writeValueAsString(value))
.bind("minNumWorkers", value.getMinNumWorkers())
.bind("nodeData", jsonMapper.writeValueAsString(value.getNodeData()))
.bind("userData", jsonMapper.writeValueAsString(value.getUserData()))
.bind("securityGroupIds", jsonMapper.writeValueAsString(value.getSecurityGroupIds()))
.bind("keyName", jsonMapper.writeValueAsString(value.getKeyName()))
.execute(); .execute();
return null; return null;

View File

@ -1,33 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
/**
*/
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, property="classType")
@JsonSubTypes(value={
@JsonSubTypes.Type(name="galaxy", value=GalaxyUserData.class)
})
public interface WorkerUserData
{
}

View File

@ -41,6 +41,7 @@ public abstract class WorkerConfig
public int getCapacity() public int getCapacity()
{ {
return Runtime.getRuntime().availableProcessors() - 1; return 1;
//return Runtime.getRuntime().availableProcessors() - 1;
} }
} }

View File

@ -343,8 +343,6 @@ public class RemoteTaskRunnerTest
"0", "0",
0, 0,
null, null,
null,
Lists.<String>newArrayList(),
null null
) )
); );

View File

@ -107,9 +107,7 @@ public class EC2AutoScalingStrategyTest
"0", "0",
0, 0,
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"), new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
new GalaxyUserData("env", "version", "type"), new GalaxyUserData("env", "version", "type")
Arrays.asList("foo"),
"foo2"
) )
); );
EasyMock.replay(workerSetupManager); EasyMock.replay(workerSetupManager);
@ -138,7 +136,7 @@ public class EC2AutoScalingStrategyTest
Assert.assertEquals(created.getNodes().size(), 1); Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals("theInstance", created.getNodeIds().get(0)); Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost")); AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(deleted.getNodes().size(), 1); Assert.assertEquals(deleted.getNodes().size(), 1);