rewrite autoscaling with tests

This commit is contained in:
fjy 2014-11-18 15:41:06 -08:00
parent c91310914b
commit 64719b15e0
35 changed files with 896 additions and 279 deletions

View File

@ -88,7 +88,7 @@ import java.util.concurrent.TimeUnit;
* <p/> * <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources. * fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.scaling.ResourceManagementScheduler} can take care of these duties. * For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/> * <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker. * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
* <p/> * <p/>
@ -530,6 +530,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} else { } else {
// Nothing running this task, announce it in ZK for a worker to run it // Nothing running this task, announce it in ZK for a worker to run it
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask( final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf( ImmutableMap.copyOf(
Maps.transformEntries( Maps.transformEntries(
zkWorkers, zkWorkers,

View File

@ -20,11 +20,14 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -47,7 +50,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final ZkPathsConfig zkPaths, final ZkPathsConfig zkPaths,
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
@Global final HttpClient httpClient, @Global final HttpClient httpClient,
final WorkerSelectStrategy strategy final Supplier<WorkerBehaviourConfig> workerBehaviourConfigSupplier
) )
{ {
this.curator = curator; this.curator = curator;
@ -55,7 +58,17 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.zkPaths = zkPaths; this.zkPaths = zkPaths;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.httpClient = httpClient; this.httpClient = httpClient;
this.strategy = strategy; if (workerBehaviourConfigSupplier != null) {
// Backwards compatibility
final WorkerBehaviourConfig workerBehaviourConfig = workerBehaviourConfigSupplier.get();
if (workerBehaviourConfig != null) {
this.strategy = workerBehaviourConfig.getSelectStrategy();
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
} }
@Override @Override

View File

@ -35,8 +35,8 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactory; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;

View File

@ -17,15 +17,29 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
import java.util.List; import java.util.List;
/** /**
* The AutoScalingStrategy has the actual methods to provision and terminate worker nodes. * The AutoScaler has the actual methods to provision and terminate worker nodes.
*/ */
public interface AutoScalingStrategy @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NoopAutoScaler.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "ec2", value = EC2AutoScaler.class)
})
public interface AutoScaler<T>
{ {
public int getMinNumWorkers();
public int getMaxNumWorkers();
public T getEnvConfig();
public AutoScalingData provision(); public AutoScalingData provision();
public AutoScalingData terminate(List<String> ips); public AutoScalingData terminate(List<String> ips);
@ -34,14 +48,18 @@ public interface AutoScalingStrategy
/** /**
* Provides a lookup of ip addresses to node ids * Provides a lookup of ip addresses to node ids
*
* @param ips - nodes IPs * @param ips - nodes IPs
*
* @return node ids * @return node ids
*/ */
public List<String> ipToIdLookup(List<String> ips); public List<String> ipToIdLookup(List<String> ips);
/** /**
* Provides a lookup of node ids to ip addresses * Provides a lookup of node ids to ip addresses
*
* @param nodeIds - nodes ids * @param nodeIds - nodes ids
*
* @return IPs associated with the node * @return IPs associated with the node
*/ */
public List<String> idToIpLookup(List<String> nodeIds); public List<String> idToIpLookup(List<String> nodeIds);

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,8 +17,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.UOE;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import java.util.List; import java.util.List;
@ -26,9 +27,27 @@ import java.util.List;
/** /**
* This class just logs when scaling should occur. * This class just logs when scaling should occur.
*/ */
public class NoopAutoScalingStrategy implements AutoScalingStrategy public class NoopAutoScaler<Void> implements AutoScaler<Void>
{ {
private static final EmittingLogger log = new EmittingLogger(NoopAutoScalingStrategy.class); private static final EmittingLogger log = new EmittingLogger(NoopAutoScaler.class);
@Override
public int getMinNumWorkers()
{
return 0;
}
@Override
public int getMaxNumWorkers()
{
return 0;
}
@Override
public Void getEnvConfig()
{
throw new UOE("No config for Noop!");
}
@Override @Override
public AutoScalingData provision() public AutoScalingData provision()

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.RemoteTaskRunner;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.ZkWorker;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.annotation.JsonValue;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period; import org.joda.time.Period;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -34,7 +34,7 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -48,9 +48,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{ {
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagementConfig config; private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupDataRef; private final Supplier<WorkerBehaviourConfig> workerConfigRef;
private final ScalingStats scalingStats; private final ScalingStats scalingStats;
private final Object lock = new Object(); private final Object lock = new Object();
@ -63,14 +62,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Inject @Inject
public SimpleResourceManagementStrategy( public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagementConfig config, SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupDataRef Supplier<WorkerBehaviourConfig> workerConfigRef
) )
{ {
this.autoScalingStrategy = autoScalingStrategy;
this.config = config; this.config = config;
this.workerSetupDataRef = workerSetupDataRef; this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
} }
@ -79,15 +76,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{ {
synchronized (lock) { synchronized (lock) {
boolean didProvision = false; boolean didProvision = false;
final WorkerSetupData workerSetupData = workerSetupDataRef.get(); final WorkerBehaviourConfig workerConfig = workerConfigRef.get();
if (workerSetupData == null) { if (workerConfig == null) {
log.warn("No workerSetupData available, cannot provision new workers."); log.warn("No workerConfig available, cannot provision new workers.");
return false; return false;
} }
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config); final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup( final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList( Lists.newArrayList(
Iterables.<ZkWorker, String>transform( Iterables.<ZkWorker, String>transform(
zkWorkers, zkWorkers,
@ -104,11 +101,11 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
); );
currentlyProvisioning.removeAll(workerNodeIds); currentlyProvisioning.removeAll(workerNodeIds);
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers); updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers);
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size()); int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
while (want > 0) { while (want > 0) {
final AutoScalingData provisioned = autoScalingStrategy.provision(); final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
final List<String> newNodes; final List<String> newNodes;
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) { if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
break; break;
@ -132,7 +129,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
.addData("provisioningCount", currentlyProvisioning.size()) .addData("provisioningCount", currentlyProvisioning.size())
.emit(); .emit();
autoScalingStrategy.terminateWithIds(Lists.newArrayList(currentlyProvisioning)); workerConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(currentlyProvisioning));
currentlyProvisioning.clear(); currentlyProvisioning.clear();
} }
} }
@ -145,15 +142,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
synchronized (lock) { synchronized (lock) {
final WorkerSetupData workerSetupData = workerSetupDataRef.get(); final WorkerBehaviourConfig workerConfig = workerConfigRef.get();
if (workerSetupData == null) { if (workerConfig == null) {
log.warn("No workerSetupData available, cannot terminate workers."); log.warn("No workerConfig available, cannot terminate workers.");
return false; return false;
} }
boolean didTerminate = false; boolean didTerminate = false;
final Set<String> workerNodeIds = Sets.newHashSet( final Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup( workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList( Lists.newArrayList(
Iterables.transform( Iterables.transform(
zkWorkers, zkWorkers,
@ -179,7 +176,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear(); currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting); currentlyTerminating.addAll(stillExisting);
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers); updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config); final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
if (currentlyTerminating.isEmpty()) { if (currentlyTerminating.isEmpty()) {
@ -211,7 +208,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
Joiner.on(", ").join(laziestWorkerIps) Joiner.on(", ").join(laziestWorkerIps)
); );
final AutoScalingData terminated = autoScalingStrategy.terminate(laziestWorkerIps); final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps);
if (terminated != null) { if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds()); currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime(); lastTerminateTime = new DateTime();
@ -282,7 +279,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
private void updateTargetWorkerCount( private void updateTargetWorkerCount(
final WorkerSetupData workerSetupData, final WorkerBehaviourConfig workerConfig,
final Collection<RemoteTaskRunnerWorkItem> pendingTasks, final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers final Collection<ZkWorker> zkWorkers
) )
@ -293,8 +290,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
createValidWorkerPredicate(config) createValidWorkerPredicate(config)
); );
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config); final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final int minWorkerCount = workerSetupData.getMinNumWorkers(); final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerSetupData.getMaxNumWorkers(); final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
if (minWorkerCount > maxWorkerCount) { if (minWorkerCount > maxWorkerCount) {
log.error("Huh? minWorkerCount[%d] > maxWorkerCount[%d]. I give up!", minWorkerCount, maxWorkerCount); log.error("Huh? minWorkerCount[%d] > maxWorkerCount[%d]. I give up!", minWorkerCount, maxWorkerCount);

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling.ec2;
import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
@ -29,53 +29,83 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.setup.EC2NodeData; import io.druid.indexing.overlord.autoscaling.AutoScaler;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.autoscaling.AutoScalingData;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import java.util.List; import java.util.List;
/** /**
*/ */
public class EC2AutoScalingStrategy implements AutoScalingStrategy public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
{ {
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class); private static final EmittingLogger log = new EmittingLogger(EC2AutoScaler.class);
private final int minNumWorkers;
private final int maxNumWorkers;
private final EC2EnvironmentConfig envConfig;
private final AmazonEC2 amazonEC2Client; private final AmazonEC2 amazonEC2Client;
private final SimpleResourceManagementConfig config; private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupDataRef;
@Inject @JsonCreator
public EC2AutoScalingStrategy( public EC2AutoScaler(
AmazonEC2 amazonEC2Client, @JsonProperty("minNumWorkers") int minNumWorkers,
SimpleResourceManagementConfig config, @JsonProperty("maxNumWorkers") int maxNumWorkers,
Supplier<WorkerSetupData> workerSetupDataRef @JsonProperty("envConfig") EC2EnvironmentConfig envConfig,
@JacksonInject AmazonEC2 amazonEC2Client,
@JacksonInject SimpleResourceManagementConfig config
) )
{ {
this.minNumWorkers = minNumWorkers;
this.maxNumWorkers = maxNumWorkers;
this.envConfig = envConfig;
this.amazonEC2Client = amazonEC2Client; this.amazonEC2Client = amazonEC2Client;
this.config = config; this.config = config;
this.workerSetupDataRef = workerSetupDataRef; }
@Override
@JsonProperty
public int getMinNumWorkers()
{
return minNumWorkers;
}
@Override
@JsonProperty
public int getMaxNumWorkers()
{
return maxNumWorkers;
}
@Override
@JsonProperty
public EC2EnvironmentConfig getEnvConfig()
{
return envConfig;
} }
@Override @Override
public AutoScalingData provision() public AutoScalingData provision()
{ {
try { try {
final WorkerSetupData setupData = workerSetupDataRef.get(); final EC2NodeData workerConfig = envConfig.getNodeData();
final EC2NodeData workerConfig = setupData.getNodeData();
final String userDataBase64; final String userDataBase64;
if (setupData.getUserData() == null) { if (envConfig.getUserData() == null) {
userDataBase64 = null; userDataBase64 = null;
} else { } else {
if (config.getWorkerVersion() == null) { if (config.getWorkerVersion() == null) {
userDataBase64 = setupData.getUserData().getUserDataBase64(); userDataBase64 = envConfig.getUserData().getUserDataBase64();
} else { } else {
userDataBase64 = setupData.getUserData().withVersion(config.getWorkerVersion()).getUserDataBase64(); userDataBase64 = envConfig.getUserData()
.withVersion(config.getWorkerVersion())
.getUserDataBase64();
} }
} }
@ -87,7 +117,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
) )
.withInstanceType(workerConfig.getInstanceType()) .withInstanceType(workerConfig.getInstanceType())
.withSecurityGroupIds(workerConfig.getSecurityGroupIds()) .withSecurityGroupIds(workerConfig.getSecurityGroupIds())
.withPlacement(new Placement(setupData.getAvailabilityZone())) .withPlacement(new Placement(envConfig.getAvailabilityZone()))
.withKeyName(workerConfig.getKeyName()) .withKeyName(workerConfig.getKeyName())
.withUserData(userDataBase64) .withUserData(userDataBase64)
); );
@ -253,4 +283,48 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return retVal; return retVal;
} }
@Override
public String toString()
{
return "EC2AutoScaler{" +
"envConfig=" + envConfig +
", maxNumWorkers=" + maxNumWorkers +
", minNumWorkers=" + minNumWorkers +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EC2AutoScaler that = (EC2AutoScaler) o;
if (maxNumWorkers != that.maxNumWorkers) {
return false;
}
if (minNumWorkers != that.minNumWorkers) {
return false;
}
if (envConfig != null ? !envConfig.equals(that.envConfig) : that.envConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = minNumWorkers;
result = 31 * result + maxNumWorkers;
result = 31 * result + (envConfig != null ? envConfig.hashCode() : 0);
return result;
}
} }

View File

@ -0,0 +1,106 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class EC2EnvironmentConfig
{
private final String availabilityZone;
private final EC2NodeData nodeData;
private final EC2UserData userData;
@JsonCreator
public EC2EnvironmentConfig(
@JsonProperty("availabilityZone") String availabilityZone,
@JsonProperty("nodeData") EC2NodeData nodeData,
@JsonProperty("userData") EC2UserData userData
)
{
this.availabilityZone = availabilityZone;
this.nodeData = nodeData;
this.userData = userData;
}
@JsonProperty
public String getAvailabilityZone()
{
return availabilityZone;
}
@JsonProperty
public EC2NodeData getNodeData()
{
return nodeData;
}
@JsonProperty
public EC2UserData getUserData()
{
return userData;
}
@Override
public String toString()
{
return "EC2EnvironmentConfig{" +
"availabilityZone='" + availabilityZone + '\'' +
", nodeData=" + nodeData +
", userData=" + userData +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EC2EnvironmentConfig that = (EC2EnvironmentConfig) o;
if (availabilityZone != null ? !availabilityZone.equals(that.availabilityZone) : that.availabilityZone != null) {
return false;
}
if (nodeData != null ? !nodeData.equals(that.nodeData) : that.nodeData != null) {
return false;
}
if (userData != null ? !userData.equals(that.userData) : that.userData != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = availabilityZone != null ? availabilityZone.hashCode() : 0;
result = 31 * result + (nodeData != null ? nodeData.hashCode() : 0);
result = 31 * result + (userData != null ? userData.hashCode() : 0);
return result;
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.setup; package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
@ -101,4 +101,50 @@ public class EC2NodeData
", keyName='" + keyName + '\'' + ", keyName='" + keyName + '\'' +
'}'; '}';
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EC2NodeData that = (EC2NodeData) o;
if (maxInstances != that.maxInstances) {
return false;
}
if (minInstances != that.minInstances) {
return false;
}
if (amiId != null ? !amiId.equals(that.amiId) : that.amiId != null) {
return false;
}
if (instanceType != null ? !instanceType.equals(that.instanceType) : that.instanceType != null) {
return false;
}
if (keyName != null ? !keyName.equals(that.keyName) : that.keyName != null) {
return false;
}
if (securityGroupIds != null ? !securityGroupIds.equals(that.securityGroupIds) : that.securityGroupIds != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = amiId != null ? amiId.hashCode() : 0;
result = 31 * result + (instanceType != null ? instanceType.hashCode() : 0);
result = 31 * result + minInstances;
result = 31 * result + maxInstances;
result = 31 * result + (securityGroupIds != null ? securityGroupIds.hashCode() : 0);
result = 31 * result + (keyName != null ? keyName.hashCode() : 0);
return result;
}
} }

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.setup; package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.setup; package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
@ -85,6 +85,44 @@ public class GalaxyEC2UserData implements EC2UserData<GalaxyEC2UserData>
} }
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GalaxyEC2UserData that = (GalaxyEC2UserData) o;
if (env != null ? !env.equals(that.env) : that.env != null) {
return false;
}
if (jsonMapper != null ? !jsonMapper.equals(that.jsonMapper) : that.jsonMapper != null) {
return false;
}
if (type != null ? !type.equals(that.type) : that.type != null) {
return false;
}
if (version != null ? !version.equals(that.version) : that.version != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = jsonMapper != null ? jsonMapper.hashCode() : 0;
result = 31 * result + (env != null ? env.hashCode() : 0);
result = 31 * result + (version != null ? version.hashCode() : 0);
result = 31 * result + (type != null ? type.hashCode() : 0);
return result;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.setup; package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
@ -78,6 +78,42 @@ public class StringEC2UserData implements EC2UserData<StringEC2UserData>
return Base64.encodeBase64String(finalData.getBytes(Charsets.UTF_8)); return Base64.encodeBase64String(finalData.getBytes(Charsets.UTF_8));
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StringEC2UserData that = (StringEC2UserData) o;
if (data != null ? !data.equals(that.data) : that.data != null) {
return false;
}
if (version != null ? !version.equals(that.version) : that.version != null) {
return false;
}
if (versionReplacementString != null
? !versionReplacementString.equals(that.versionReplacementString)
: that.versionReplacementString != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = data != null ? data.hashCode() : 0;
result = 31 * result + (versionReplacementString != null ? versionReplacementString.hashCode() : 0);
result = 31 * result + (version != null ? version.hashCode() : 0);
return result;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -36,14 +36,15 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.metadata.EntryExistsException;
import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue; import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -77,6 +78,9 @@ public class OverlordResource
private final TaskLogStreamer taskLogStreamer; private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager; private final JacksonConfigManager configManager;
private AtomicReference<WorkerBehaviourConfig> workerConfigRef = null;
@Deprecated
private AtomicReference<WorkerSetupData> workerSetupDataRef = null; private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
@Inject @Inject
@ -185,6 +189,36 @@ public class OverlordResource
); );
} }
@GET
@Path("/worker")
@Produces("application/json")
public Response getWorkerConfig()
{
if (workerConfigRef == null) {
workerConfigRef = configManager.watch(WorkerBehaviourConfig.CONFIG_KEY, WorkerBehaviourConfig.class);
}
return Response.ok(workerConfigRef.get()).build();
}
@POST
@Path("/worker")
@Consumes("application/json")
public Response setWorkerConfig(
final WorkerBehaviourConfig workerBehaviourConfig
)
{
if (!configManager.set(WorkerBehaviourConfig.CONFIG_KEY, workerBehaviourConfig)) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
log.info("Updating Worker configs: %s", workerBehaviourConfig);
return Response.ok().build();
}
@Deprecated
@GET @GET
@Path("/worker/setup") @Path("/worker/setup")
@Produces("application/json") @Produces("application/json")
@ -197,6 +231,7 @@ public class OverlordResource
return Response.ok(workerSetupDataRef.get()).build(); return Response.ok(workerSetupDataRef.get()).build();
} }
@Deprecated
@POST @POST
@Path("/worker/setup") @Path("/worker/setup")
@Consumes("application/json") @Consumes("application/json")

View File

@ -1,9 +1,28 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup; package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Maps; import com.google.common.collect.Maps;
import javax.validation.constraints.NotNull;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -11,13 +30,47 @@ import java.util.Map;
*/ */
public class FillCapacityWithAffinityConfig public class FillCapacityWithAffinityConfig
{ {
@JsonProperty
@NotNull
// key:Datasource, value:[nodeHostNames] // key:Datasource, value:[nodeHostNames]
private Map<String, List<String>> preferences = Maps.newHashMap(); private Map<String, List<String>> affinity = Maps.newHashMap();
public Map<String, List<String>> getPreferences() @JsonCreator
public FillCapacityWithAffinityConfig(
@JsonProperty("affinity") Map<String, List<String>> affinity
)
{ {
return preferences; this.affinity = affinity;
}
@JsonProperty
public Map<String, List<String>> getAffinity()
{
return affinity;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FillCapacityWithAffinityConfig that = (FillCapacityWithAffinityConfig) o;
if (affinity != null
? !Maps.difference(affinity, that.affinity).entriesDiffering().isEmpty()
: that.affinity != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return affinity != null ? affinity.hashCode() : 0;
} }
} }

View File

@ -1,9 +1,29 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup; package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Sets; import com.google.api.client.util.Sets;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
@ -18,25 +38,28 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
private final FillCapacityWithAffinityConfig affinityConfig; private final FillCapacityWithAffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet(); private final Set<String> affinityWorkerHosts = Sets.newHashSet();
@JsonCreator
@Inject
public FillCapacityWithAffinityWorkerSelectStrategy( public FillCapacityWithAffinityWorkerSelectStrategy(
FillCapacityWithAffinityConfig affinityConfig, @JsonProperty("affinityConfig") FillCapacityWithAffinityConfig affinityConfig
RemoteTaskRunnerConfig config
) )
{ {
super(config);
this.affinityConfig = affinityConfig; this.affinityConfig = affinityConfig;
for (List<String> affinityWorkers : affinityConfig.getPreferences().values()) { for (List<String> affinityWorkers : affinityConfig.getAffinity().values()) {
for (String affinityWorker : affinityWorkers) { for (String affinityWorker : affinityWorkers) {
this.affinityWorkerHosts.add(affinityWorker); this.affinityWorkerHosts.add(affinityWorker);
} }
} }
} }
@JsonProperty
public FillCapacityWithAffinityConfig getAffinityConfig()
{
return affinityConfig;
}
@Override @Override
public Optional<ImmutableZkWorker> findWorkerForTask( public Optional<ImmutableZkWorker> findWorkerForTask(
ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
) )
{ {
// don't run other datasources on affinity workers; we only want our configured datasources to run on them // don't run other datasources on affinity workers; we only want our configured datasources to run on them
@ -48,9 +71,9 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
} }
ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build(); ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build();
List<String> workerHosts = affinityConfig.getPreferences().get(task.getDataSource()); List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
if (workerHosts == null) { if (workerHosts == null) {
return super.findWorkerForTask(eligibleWorkers, task); return super.findWorkerForTask(config, eligibleWorkers, task);
} }
ImmutableMap.Builder<String, ImmutableZkWorker> affinityBuilder = new ImmutableMap.Builder<>(); ImmutableMap.Builder<String, ImmutableZkWorker> affinityBuilder = new ImmutableMap.Builder<>();
@ -63,14 +86,38 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
ImmutableMap<String, ImmutableZkWorker> affinityWorkers = affinityBuilder.build(); ImmutableMap<String, ImmutableZkWorker> affinityWorkers = affinityBuilder.build();
if (!affinityWorkers.isEmpty()) { if (!affinityWorkers.isEmpty()) {
Optional<ImmutableZkWorker> retVal = super.findWorkerForTask(affinityWorkers, task); Optional<ImmutableZkWorker> retVal = super.findWorkerForTask(config, affinityWorkers, task);
if (retVal.isPresent()) { if (retVal.isPresent()) {
return retVal; return retVal;
} }
} }
return super.findWorkerForTask(eligibleWorkers, task); return super.findWorkerForTask(config, eligibleWorkers, task);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FillCapacityWithAffinityWorkerSelectStrategy that = (FillCapacityWithAffinityWorkerSelectStrategy) o;
if (affinityConfig != null ? !affinityConfig.equals(that.affinityConfig) : that.affinityConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return affinityConfig != null ? affinityConfig.hashCode() : 0;
} }
} }

View File

@ -23,7 +23,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
@ -35,17 +34,9 @@ import java.util.TreeSet;
*/ */
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{ {
private final RemoteTaskRunnerConfig config; @Override
@Inject
public FillCapacityWorkerSelectStrategy(RemoteTaskRunnerConfig config)
{
this.config = config;
}
public Optional<ImmutableZkWorker> findWorkerForTask( public Optional<ImmutableZkWorker> findWorkerForTask(
final ImmutableMap<String, ImmutableZkWorker> zkWorkers, RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
final Task task
) )
{ {
TreeSet<ImmutableZkWorker> sortedWorkers = Sets.newTreeSet( TreeSet<ImmutableZkWorker> sortedWorkers = Sets.newTreeSet(

View File

@ -0,0 +1,95 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.overlord.autoscaling.AutoScaler;
/**
*/
public class WorkerBehaviourConfig
{
public static final String CONFIG_KEY = "worker.config";
private final WorkerSelectStrategy selectStrategy;
private final AutoScaler autoScaler;
@JsonCreator
public WorkerBehaviourConfig(
@JsonProperty("selectStrategy") WorkerSelectStrategy selectStrategy,
@JsonProperty("autoScaler") AutoScaler autoScaler
)
{
this.selectStrategy = selectStrategy;
this.autoScaler = autoScaler;
}
@JsonProperty
public WorkerSelectStrategy getSelectStrategy()
{
return selectStrategy;
}
@JsonProperty
public AutoScaler getAutoScaler()
{
return autoScaler;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerBehaviourConfig that = (WorkerBehaviourConfig) o;
if (autoScaler != null ? !autoScaler.equals(that.autoScaler) : that.autoScaler != null) {
return false;
}
if (selectStrategy != null ? !selectStrategy.equals(that.selectStrategy) : that.selectStrategy != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = selectStrategy != null ? selectStrategy.hashCode() : 0;
result = 31 * result + (autoScaler != null ? autoScaler.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "WorkerConfiguration{" +
"selectStrategy=" + selectStrategy +
", autoScaler=" + autoScaler +
'}';
}
}

View File

@ -19,25 +19,35 @@
package io.druid.indexing.overlord.setup; package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
/** /**
* The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to. * The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to.
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FillCapacityWorkerSelectStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class)
})
public interface WorkerSelectStrategy public interface WorkerSelectStrategy
{ {
/** /**
* Customizable logic for selecting a worker to run a task. * Customizable logic for selecting a worker to run a task.
* *
* @param config A config for running remote tasks
* @param zkWorkers An immutable map of workers to choose from. * @param zkWorkers An immutable map of workers to choose from.
* @param task The task to assign. * @param task The task to assign.
* *
* @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available. * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
*/ */
public Optional<ImmutableZkWorker> findWorkerForTask( public Optional<ImmutableZkWorker> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers, final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task final Task task
); );

View File

@ -21,9 +21,12 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
/** /**
*/ */
@Deprecated
public class WorkerSetupData public class WorkerSetupData
{ {
public static final String CONFIG_KEY = "worker.setup"; public static final String CONFIG_KEY = "worker.setup";

View File

@ -404,7 +404,7 @@ public class RemoteTaskRunnerTest
cf, cf,
new SimplePathChildrenCacheFactory.Builder().build(), new SimplePathChildrenCacheFactory.Builder().build(),
null, null,
new FillCapacityWorkerSelectStrategy(config) new FillCapacityWorkerSelectStrategy()
); );
remoteTaskRunner.start(); remoteTaskRunner.start();

View File

@ -21,15 +21,16 @@ package io.druid.indexing.overlord;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.TestUtils;
import io.druid.indexing.overlord.setup.EC2UserData; import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
import io.druid.indexing.overlord.setup.GalaxyEC2UserData; import io.druid.indexing.overlord.autoscaling.ec2.GalaxyEC2UserData;
import io.druid.indexing.overlord.setup.StringEC2UserData; import io.druid.indexing.overlord.autoscaling.ec2.StringEC2UserData;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@Deprecated
public class WorkerSetupDataTest public class WorkerSetupDataTest
{ {
@Test @Test

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
@ -28,10 +28,10 @@ import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.common.guava.DSuppliers; import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
import io.druid.indexing.overlord.setup.EC2NodeData; import io.druid.indexing.overlord.autoscaling.ec2.EC2EnvironmentConfig;
import io.druid.indexing.overlord.setup.GalaxyEC2UserData; import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.autoscaling.ec2.GalaxyEC2UserData;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.After; import org.junit.After;
@ -41,11 +41,10 @@ import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
public class EC2AutoScalingStrategyTest public class EC2AutoScalerTest
{ {
private static final String AMI_ID = "dummy"; private static final String AMI_ID = "dummy";
private static final String INSTANCE_ID = "theInstance"; private static final String INSTANCE_ID = "theInstance";
@ -56,8 +55,7 @@ public class EC2AutoScalingStrategyTest
private DescribeInstancesResult describeInstancesResult; private DescribeInstancesResult describeInstancesResult;
private Reservation reservation; private Reservation reservation;
private Instance instance; private Instance instance;
private EC2AutoScalingStrategy strategy; private SimpleResourceManagementConfig managementConfig;
private AtomicReference<WorkerSetupData> workerSetupData;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -66,7 +64,6 @@ public class EC2AutoScalingStrategyTest
runInstancesResult = EasyMock.createMock(RunInstancesResult.class); runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
reservation = EasyMock.createMock(Reservation.class); reservation = EasyMock.createMock(Reservation.class);
workerSetupData = new AtomicReference<WorkerSetupData>(null);
instance = new Instance() instance = new Instance()
.withInstanceId(INSTANCE_ID) .withInstanceId(INSTANCE_ID)
@ -74,11 +71,7 @@ public class EC2AutoScalingStrategyTest
.withImageId(AMI_ID) .withImageId(AMI_ID)
.withPrivateIpAddress(IP); .withPrivateIpAddress(IP);
strategy = new EC2AutoScalingStrategy( managementConfig = new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion("");
amazonEC2Client,
new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""),
DSuppliers.of(workerSetupData)
);
} }
@After @After
@ -93,14 +86,16 @@ public class EC2AutoScalingStrategyTest
@Test @Test
public void testScale() public void testScale()
{ {
workerSetupData.set( EC2AutoScaler autoScaler = new EC2AutoScaler(
new WorkerSetupData( 0,
0, 1,
1, new EC2EnvironmentConfig(
"", "us-east-1a",
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"), new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type") new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type")
) ),
amazonEC2Client,
managementConfig
); );
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn( EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
@ -121,12 +116,12 @@ public class EC2AutoScalingStrategyTest
EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce(); EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce();
EasyMock.replay(reservation); EasyMock.replay(reservation);
AutoScalingData created = strategy.provision(); AutoScalingData created = autoScaler.provision();
Assert.assertEquals(created.getNodeIds().size(), 1); Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals("theInstance", created.getNodeIds().get(0)); Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP")); AutoScalingData deleted = autoScaler.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(INSTANCE_ID, deleted.getNodeIds().get(0)); Assert.assertEquals(INSTANCE_ID, deleted.getNodeIds().get(0));

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.autoscaling;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -32,7 +32,7 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
@ -55,20 +55,16 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public class SimpleResourceManagementStrategyTest public class SimpleResourceManagementStrategyTest
{ {
private AutoScalingStrategy autoScalingStrategy; private AutoScaler autoScaler;
private Task testTask; private Task testTask;
private SimpleResourceManagementConfig simpleResourceManagementConfig;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy; private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
private AtomicReference<WorkerSetupData> workerSetupData; private AtomicReference<WorkerBehaviourConfig> workerConfig;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); autoScaler = EasyMock.createMock(AutoScaler.class);
workerSetupData = new AtomicReference<>(
new WorkerSetupData(
0, 2, null, null, null
)
);
testTask = new TestMergeTask( testTask = new TestMergeTask(
"task1", "task1",
@ -88,27 +84,38 @@ public class SimpleResourceManagementStrategyTest
), ),
Lists.<AggregatorFactory>newArrayList() Lists.<AggregatorFactory>newArrayList()
); );
simpleResourceManagementConfig = new SimpleResourceManagementConfig()
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion("");
workerConfig = new AtomicReference<>(
new WorkerBehaviourConfig(
null,
autoScaler
)
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
autoScalingStrategy, simpleResourceManagementConfig,
new SimpleResourceManagementConfig() DSuppliers.of(workerConfig)
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(""),
DSuppliers.of(workerSetupData)
); );
} }
@Test @Test
public void testSuccessfulProvision() throws Exception public void testSuccessfulProvision() throws Exception
{ {
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode")) new AutoScalingData(Lists.<String>newArrayList("aNode"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -125,18 +132,20 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
); );
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
} }
@Test @Test
public void testSomethingProvisioning() throws Exception public void testSomethingProvisioning() throws Exception
{ {
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2); .andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake")) new AutoScalingData(Lists.<String>newArrayList("fake"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -172,7 +181,7 @@ public class SimpleResourceManagementStrategyTest
createdTime.equals(anotherCreatedTime) createdTime.equals(anotherCreatedTime)
); );
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
} }
@Test @Test
@ -184,14 +193,16 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expectLastCall(); EasyMock.expectLastCall();
EasyMock.replay(emitter); EasyMock.replay(emitter);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2); .andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.terminateWithIds(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.terminateWithIds(EasyMock.<List<String>>anyObject()))
.andReturn(null); .andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake")) new AutoScalingData(Lists.<String>newArrayList("fake"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -229,21 +240,21 @@ public class SimpleResourceManagementStrategyTest
createdTime.equals(anotherCreatedTime) createdTime.equals(anotherCreatedTime)
); );
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
EasyMock.verify(emitter); EasyMock.verify(emitter);
} }
@Test @Test
public void testDoSuccessfulTerminate() throws Exception public void testDoSuccessfulTerminate() throws Exception
{ {
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null)); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn( EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList()) new AutoScalingData(Lists.<String>newArrayList())
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -260,20 +271,20 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
); );
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
} }
@Test @Test
public void testSomethingTerminating() throws Exception public void testSomethingTerminating() throws Exception
{ {
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null)); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2); .andReturn(Lists.<String>newArrayList("ip")).times(2);
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn( EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip")) new AutoScalingData(Lists.<String>newArrayList("ip"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -305,16 +316,18 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
); );
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
} }
@Test @Test
public void testNoActionNeeded() throws Exception public void testNoActionNeeded() throws Exception
{ {
EasyMock.reset(autoScalingStrategy); EasyMock.reset(autoScaler);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -327,12 +340,14 @@ public class SimpleResourceManagementStrategyTest
); );
Assert.assertFalse(terminatedSomething); Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
EasyMock.reset(autoScalingStrategy); EasyMock.reset(autoScaler);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -345,17 +360,19 @@ public class SimpleResourceManagementStrategyTest
); );
Assert.assertFalse(provisionedSomething); Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
} }
@Test @Test
public void testMinCountIncrease() throws Exception public void testMinCountIncrease() throws Exception
{ {
// Don't terminate anything // Don't terminate anything
EasyMock.reset(autoScalingStrategy); EasyMock.reset(autoScaler);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(), Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
@ -363,13 +380,15 @@ public class SimpleResourceManagementStrategyTest
) )
); );
Assert.assertFalse(terminatedSomething); Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
// Don't provision anything // Don't provision anything
EasyMock.reset(autoScalingStrategy); EasyMock.reset(autoScaler);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(), Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
@ -377,22 +396,22 @@ public class SimpleResourceManagementStrategyTest
) )
); );
Assert.assertFalse(provisionedSomething); Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
EasyMock.reset(autoScaler);
// Increase minNumWorkers // Increase minNumWorkers
workerSetupData.set(new WorkerSetupData(3, 5, null, null, null)); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
// Should provision two new workers EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3")) new AutoScalingData(Lists.<String>newArrayList("h3"))
); );
EasyMock.expect(autoScalingStrategy.provision()).andReturn( // Should provision two new workers
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4")) new AutoScalingData(Lists.<String>newArrayList("h4"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(), Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
@ -400,14 +419,14 @@ public class SimpleResourceManagementStrategyTest
) )
); );
Assert.assertTrue(provisionedSomething); Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
} }
@Test @Test
public void testNullWorkerSetupData() throws Exception public void testNullWorkerConfig() throws Exception
{ {
workerSetupData.set(null); workerConfig.set(null);
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -430,7 +449,7 @@ public class SimpleResourceManagementStrategyTest
Assert.assertFalse(terminatedSomething); Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething); Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScaler);
} }
private static class TestZkWorker extends ZkWorker private static class TestZkWorker extends ZkWorker

View File

@ -11,8 +11,6 @@ import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class FillCapacityWithAffinityWorkerSelectStrategyTest public class FillCapacityWithAffinityWorkerSelectStrategyTest
{ {
@ -20,18 +18,11 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTask() throws Exception public void testFindWorkerForTask() throws Exception
{ {
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig() new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
{
@Override
public Map<String, List<String>> getPreferences()
{
return ImmutableMap.of("foo", Arrays.asList("localhost"));
}
},
new RemoteTaskRunnerConfig()
); );
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask( Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of( ImmutableMap.of(
"lhost", "lhost",
new ImmutableZkWorker( new ImmutableZkWorker(
@ -61,18 +52,11 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTaskWithNulls() throws Exception public void testFindWorkerForTaskWithNulls() throws Exception
{ {
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig() new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
{
@Override
public Map<String, List<String>> getPreferences()
{
return ImmutableMap.of("foo", Arrays.asList("localhost"));
}
},
new RemoteTaskRunnerConfig()
); );
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask( Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of( ImmutableMap.of(
"lhost", "lhost",
new ImmutableZkWorker( new ImmutableZkWorker(
@ -95,18 +79,11 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testIsolation() throws Exception public void testIsolation() throws Exception
{ {
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig() new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
{
@Override
public Map<String, List<String>> getPreferences()
{
return ImmutableMap.of("foo", Arrays.asList("localhost"));
}
},
new RemoteTaskRunnerConfig()
); );
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask( Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of( ImmutableMap.of(
"localhost", "localhost",
new ImmutableZkWorker( new ImmutableZkWorker(

View File

@ -0,0 +1,87 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
import io.druid.indexing.overlord.autoscaling.ec2.EC2EnvironmentConfig;
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
import io.druid.indexing.overlord.autoscaling.ec2.StringEC2UserData;
import io.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Arrays;
public class WorkerBehaviourConfigTest
{
@Test
public void testSerde() throws Exception
{
WorkerBehaviourConfig config = new WorkerBehaviourConfig(
new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(
ImmutableMap.of("foo", Arrays.asList("localhost"))
)
),
new EC2AutoScaler(
7,
11,
new EC2EnvironmentConfig(
"us-east-1a",
new EC2NodeData(
"amiid",
"instanceType",
3,
5,
Arrays.asList("securityGroupIds"),
"keyNames"
),
new StringEC2UserData(
"availZone",
"replace",
"version"
)
),
null,
null
)
);
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
return null;
}
}
);
Assert.assertEquals(config, mapper.readValue(mapper.writeValueAsBytes(config), WorkerBehaviourConfig.class));
}
}

View File

@ -35,7 +35,6 @@ import io.druid.client.indexing.IndexingServiceSelectorConfig;
import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule; import io.druid.guice.IndexingServiceTaskLogsModule;
import io.druid.guice.JacksonConfigProvider;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
@ -59,22 +58,15 @@ import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunnerFactory; import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.http.OverlordRedirectInfo; import io.druid.indexing.overlord.http.OverlordRedirectInfo;
import io.druid.indexing.overlord.http.OverlordResource; import io.druid.indexing.overlord.http.OverlordResource;
import io.druid.indexing.overlord.scaling.AutoScalingStrategy;
import io.druid.indexing.overlord.scaling.EC2AutoScalingStrategy;
import io.druid.indexing.overlord.scaling.NoopAutoScalingStrategy;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.overlord.scaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWithAffinityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectFilter;
@ -204,24 +196,6 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.indexer.runner.workerSelectStrategy.type",
Key.get(WorkerSelectStrategy.class),
Key.get(FillCapacityWorkerSelectStrategy.class)
);
final MapBinder<String, WorkerSelectStrategy> stratBinder = PolyBind.optionBinder(
binder,
Key.get(WorkerSelectStrategy.class)
);
stratBinder.addBinding("fillCapacity").to(FillCapacityWorkerSelectStrategy.class);
binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class);
stratBinder.addBinding("fillCapacityWithPreference")
.to(FillCapacityWithAffinityWorkerSelectStrategy.class);
binder.bind(FillCapacityWithAffinityWorkerSelectStrategy.class).in(LazySingleton.class);
} }
private void configureAutoscale(Binder binder) private void configureAutoscale(Binder binder)
@ -231,24 +205,6 @@ public class CliOverlord extends ServerRunnable
.to(SimpleResourceManagementStrategy.class) .to(SimpleResourceManagementStrategy.class)
.in(LazySingleton.class); .in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
PolyBind.createChoice(
binder,
"druid.indexer.autoscale.strategy",
Key.get(AutoScalingStrategy.class),
Key.get(NoopAutoScalingStrategy.class)
);
final MapBinder<String, AutoScalingStrategy> autoScalingBinder = PolyBind.optionBinder(
binder, Key.get(AutoScalingStrategy.class)
);
autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class);
binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class);
autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class);
binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
} }
}, },