Merge pull request #879 from metamx/rtr-with-pref

Rewrite autoscaling and enable easier configuration of worker selection and autoscaling behaviour
This commit is contained in:
Fangjin Yang 2014-11-24 17:54:28 -07:00
commit 3ff569ef2d
35 changed files with 1098 additions and 236 deletions

View File

@ -90,7 +90,7 @@ import java.util.concurrent.TimeUnit;
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.scaling.ResourceManagementScheduler} can take care of these duties.
* For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
* <p/>
@ -532,6 +532,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} else {
// Nothing running this task, announce it in ZK for a worker to run it
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
zkWorkers,

View File

@ -20,11 +20,14 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
@ -47,7 +50,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final ZkPathsConfig zkPaths,
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final WorkerSelectStrategy strategy
final Supplier<WorkerBehaviorConfig> workerBehaviourConfigSupplier
)
{
this.curator = curator;
@ -55,7 +58,17 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.zkPaths = zkPaths;
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.strategy = strategy;
if (workerBehaviourConfigSupplier != null) {
// Backwards compatibility
final WorkerBehaviorConfig workerBehaviorConfig = workerBehaviourConfigSupplier.get();
if (workerBehaviorConfig != null) {
this.strategy = workerBehaviorConfig.getSelectStrategy();
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
}
@Override

View File

@ -35,8 +35,8 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;

View File

@ -17,15 +17,29 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
import java.util.List;
/**
* The AutoScalingStrategy has the actual methods to provision and terminate worker nodes.
* The AutoScaler has the actual methods to provision and terminate worker nodes.
*/
public interface AutoScalingStrategy
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NoopAutoScaler.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "ec2", value = EC2AutoScaler.class)
})
public interface AutoScaler<T>
{
public int getMinNumWorkers();
public int getMaxNumWorkers();
public T getEnvConfig();
public AutoScalingData provision();
public AutoScalingData terminate(List<String> ips);
@ -34,14 +48,18 @@ public interface AutoScalingStrategy
/**
* Provides a lookup of ip addresses to node ids
*
* @param ips - nodes IPs
*
* @return node ids
*/
public List<String> ipToIdLookup(List<String> ips);
/**
* Provides a lookup of node ids to ip addresses
*
* @param nodeIds - nodes ids
*
* @return IPs associated with the node
*/
public List<String> idToIpLookup(List<String> nodeIds);

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -17,8 +17,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.UOE;
import com.metamx.emitter.EmittingLogger;
import java.util.List;
@ -26,9 +27,27 @@ import java.util.List;
/**
* This class just logs when scaling should occur.
*/
public class NoopAutoScalingStrategy implements AutoScalingStrategy
public class NoopAutoScaler<Void> implements AutoScaler<Void>
{
private static final EmittingLogger log = new EmittingLogger(NoopAutoScalingStrategy.class);
private static final EmittingLogger log = new EmittingLogger(NoopAutoScaler.class);
@Override
public int getMinNumWorkers()
{
return 0;
}
@Override
public int getMaxNumWorkers()
{
return 0;
}
@Override
public Void getEnvConfig()
{
throw new UOE("No config for Noop!");
}
@Override
public AutoScalingData provision()

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.logger.Logger;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.indexing.overlord.RemoteTaskRunner;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@ -34,7 +34,7 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -48,9 +48,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupDataRef;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats;
private final Object lock = new Object();
@ -63,14 +62,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Inject
public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupDataRef
Supplier<WorkerBehaviorConfig> workerConfigRef
)
{
this.autoScalingStrategy = autoScalingStrategy;
this.config = config;
this.workerSetupDataRef = workerSetupDataRef;
this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
}
@ -79,15 +76,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{
synchronized (lock) {
boolean didProvision = false;
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
if (workerSetupData == null) {
log.warn("No workerSetupData available, cannot provision new workers.");
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
log.warn("No workerConfig available, cannot provision new workers.");
return false;
}
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.<ZkWorker, String>transform(
zkWorkers,
@ -104,11 +101,11 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
);
currentlyProvisioning.removeAll(workerNodeIds);
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers);
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
while (want > 0) {
final AutoScalingData provisioned = autoScalingStrategy.provision();
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
final List<String> newNodes;
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
break;
@ -132,7 +129,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
autoScalingStrategy.terminateWithIds(Lists.newArrayList(currentlyProvisioning));
workerConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(currentlyProvisioning));
currentlyProvisioning.clear();
}
}
@ -145,15 +142,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
synchronized (lock) {
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
if (workerSetupData == null) {
log.warn("No workerSetupData available, cannot terminate workers.");
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
log.warn("No workerConfig available, cannot terminate workers.");
return false;
}
boolean didTerminate = false;
final Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
zkWorkers,
@ -179,9 +176,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (zkWorkers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
@ -211,7 +208,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = autoScalingStrategy.terminate(laziestWorkerIps);
final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
@ -246,11 +243,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
private static Predicate<ZkWorker> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
final SimpleResourceManagementConfig config
)
{
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config);
return new Predicate<ZkWorker>()
{
@ -265,8 +261,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
private static Predicate<ZkWorker> createValidWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
final SimpleResourceManagementConfig config
)
{
return new Predicate<ZkWorker>()
@ -284,7 +279,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
private void updateTargetWorkerCount(
final WorkerSetupData workerSetupData,
final WorkerBehaviorConfig workerConfig,
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers
)
@ -292,11 +287,11 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
synchronized (lock) {
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(config, workerSetupData)
createValidWorkerPredicate(config)
);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
final int minWorkerCount = workerSetupData.getMinNumWorkers();
final int maxWorkerCount = workerSetupData.getMaxNumWorkers();
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
if (minWorkerCount > maxWorkerCount) {
log.error("Huh? minWorkerCount[%d] > maxWorkerCount[%d]. I give up!", minWorkerCount, maxWorkerCount);
@ -322,7 +317,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
final boolean notTakingActions = currentlyProvisioning.isEmpty()
&& currentlyTerminating.isEmpty();
&& currentlyTerminating.isEmpty();
final boolean shouldScaleUp = notTakingActions
&& validWorkers.size() >= targetWorkerCount
&& targetWorkerCount < maxWorkerCount

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling.ec2;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
@ -29,53 +29,83 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.setup.EC2NodeData;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.overlord.autoscaling.AutoScaler;
import io.druid.indexing.overlord.autoscaling.AutoScalingData;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import java.util.List;
/**
*/
public class EC2AutoScalingStrategy implements AutoScalingStrategy
public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
{
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class);
private static final EmittingLogger log = new EmittingLogger(EC2AutoScaler.class);
private final int minNumWorkers;
private final int maxNumWorkers;
private final EC2EnvironmentConfig envConfig;
private final AmazonEC2 amazonEC2Client;
private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupDataRef;
@Inject
public EC2AutoScalingStrategy(
AmazonEC2 amazonEC2Client,
SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupDataRef
@JsonCreator
public EC2AutoScaler(
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("envConfig") EC2EnvironmentConfig envConfig,
@JacksonInject AmazonEC2 amazonEC2Client,
@JacksonInject SimpleResourceManagementConfig config
)
{
this.minNumWorkers = minNumWorkers;
this.maxNumWorkers = maxNumWorkers;
this.envConfig = envConfig;
this.amazonEC2Client = amazonEC2Client;
this.config = config;
this.workerSetupDataRef = workerSetupDataRef;
}
@Override
@JsonProperty
public int getMinNumWorkers()
{
return minNumWorkers;
}
@Override
@JsonProperty
public int getMaxNumWorkers()
{
return maxNumWorkers;
}
@Override
@JsonProperty
public EC2EnvironmentConfig getEnvConfig()
{
return envConfig;
}
@Override
public AutoScalingData provision()
{
try {
final WorkerSetupData setupData = workerSetupDataRef.get();
final EC2NodeData workerConfig = setupData.getNodeData();
final EC2NodeData workerConfig = envConfig.getNodeData();
final String userDataBase64;
if (setupData.getUserData() == null) {
if (envConfig.getUserData() == null) {
userDataBase64 = null;
} else {
if (config.getWorkerVersion() == null) {
userDataBase64 = setupData.getUserData().getUserDataBase64();
userDataBase64 = envConfig.getUserData().getUserDataBase64();
} else {
userDataBase64 = setupData.getUserData().withVersion(config.getWorkerVersion()).getUserDataBase64();
userDataBase64 = envConfig.getUserData()
.withVersion(config.getWorkerVersion())
.getUserDataBase64();
}
}
@ -87,7 +117,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
)
.withInstanceType(workerConfig.getInstanceType())
.withSecurityGroupIds(workerConfig.getSecurityGroupIds())
.withPlacement(new Placement(setupData.getAvailabilityZone()))
.withPlacement(new Placement(envConfig.getAvailabilityZone()))
.withKeyName(workerConfig.getKeyName())
.withUserData(userDataBase64)
);
@ -253,4 +283,48 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return retVal;
}
@Override
public String toString()
{
return "EC2AutoScaler{" +
"envConfig=" + envConfig +
", maxNumWorkers=" + maxNumWorkers +
", minNumWorkers=" + minNumWorkers +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EC2AutoScaler that = (EC2AutoScaler) o;
if (maxNumWorkers != that.maxNumWorkers) {
return false;
}
if (minNumWorkers != that.minNumWorkers) {
return false;
}
if (envConfig != null ? !envConfig.equals(that.envConfig) : that.envConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = minNumWorkers;
result = 31 * result + maxNumWorkers;
result = 31 * result + (envConfig != null ? envConfig.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,106 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class EC2EnvironmentConfig
{
private final String availabilityZone;
private final EC2NodeData nodeData;
private final EC2UserData userData;
@JsonCreator
public EC2EnvironmentConfig(
@JsonProperty("availabilityZone") String availabilityZone,
@JsonProperty("nodeData") EC2NodeData nodeData,
@JsonProperty("userData") EC2UserData userData
)
{
this.availabilityZone = availabilityZone;
this.nodeData = nodeData;
this.userData = userData;
}
@JsonProperty
public String getAvailabilityZone()
{
return availabilityZone;
}
@JsonProperty
public EC2NodeData getNodeData()
{
return nodeData;
}
@JsonProperty
public EC2UserData getUserData()
{
return userData;
}
@Override
public String toString()
{
return "EC2EnvironmentConfig{" +
"availabilityZone='" + availabilityZone + '\'' +
", nodeData=" + nodeData +
", userData=" + userData +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EC2EnvironmentConfig that = (EC2EnvironmentConfig) o;
if (availabilityZone != null ? !availabilityZone.equals(that.availabilityZone) : that.availabilityZone != null) {
return false;
}
if (nodeData != null ? !nodeData.equals(that.nodeData) : that.nodeData != null) {
return false;
}
if (userData != null ? !userData.equals(that.userData) : that.userData != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = availabilityZone != null ? availabilityZone.hashCode() : 0;
result = 31 * result + (nodeData != null ? nodeData.hashCode() : 0);
result = 31 * result + (userData != null ? userData.hashCode() : 0);
return result;
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -101,4 +101,50 @@ public class EC2NodeData
", keyName='" + keyName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EC2NodeData that = (EC2NodeData) o;
if (maxInstances != that.maxInstances) {
return false;
}
if (minInstances != that.minInstances) {
return false;
}
if (amiId != null ? !amiId.equals(that.amiId) : that.amiId != null) {
return false;
}
if (instanceType != null ? !instanceType.equals(that.instanceType) : that.instanceType != null) {
return false;
}
if (keyName != null ? !keyName.equals(that.keyName) : that.keyName != null) {
return false;
}
if (securityGroupIds != null ? !securityGroupIds.equals(that.securityGroupIds) : that.securityGroupIds != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = amiId != null ? amiId.hashCode() : 0;
result = 31 * result + (instanceType != null ? instanceType.hashCode() : 0);
result = 31 * result + minInstances;
result = 31 * result + maxInstances;
result = 31 * result + (securityGroupIds != null ? securityGroupIds.hashCode() : 0);
result = 31 * result + (keyName != null ? keyName.hashCode() : 0);
return result;
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -85,6 +85,44 @@ public class GalaxyEC2UserData implements EC2UserData<GalaxyEC2UserData>
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GalaxyEC2UserData that = (GalaxyEC2UserData) o;
if (env != null ? !env.equals(that.env) : that.env != null) {
return false;
}
if (jsonMapper != null ? !jsonMapper.equals(that.jsonMapper) : that.jsonMapper != null) {
return false;
}
if (type != null ? !type.equals(that.type) : that.type != null) {
return false;
}
if (version != null ? !version.equals(that.version) : that.version != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = jsonMapper != null ? jsonMapper.hashCode() : 0;
result = 31 * result + (env != null ? env.hashCode() : 0);
result = 31 * result + (version != null ? version.hashCode() : 0);
result = 31 * result + (type != null ? type.hashCode() : 0);
return result;
}
@Override
public String toString()
{

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
package io.druid.indexing.overlord.autoscaling.ec2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -78,6 +78,42 @@ public class StringEC2UserData implements EC2UserData<StringEC2UserData>
return Base64.encodeBase64String(finalData.getBytes(Charsets.UTF_8));
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StringEC2UserData that = (StringEC2UserData) o;
if (data != null ? !data.equals(that.data) : that.data != null) {
return false;
}
if (version != null ? !version.equals(that.version) : that.version != null) {
return false;
}
if (versionReplacementString != null
? !versionReplacementString.equals(that.versionReplacementString)
: that.versionReplacementString != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = data != null ? data.hashCode() : 0;
result = 31 * result + (versionReplacementString != null ? versionReplacementString.hashCode() : 0);
result = 31 * result + (version != null ? version.hashCode() : 0);
return result;
}
@Override
public String toString()
{

View File

@ -37,14 +37,15 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.metadata.EntryExistsException;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@ -78,6 +79,9 @@ public class OverlordResource
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
@Deprecated
private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
@Inject
@ -165,6 +169,36 @@ public class OverlordResource
);
}
@GET
@Path("/worker")
@Produces("application/json")
public Response getWorkerConfig()
{
if (workerConfigRef == null) {
workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class);
}
return Response.ok(workerConfigRef.get()).build();
}
@POST
@Path("/worker")
@Consumes("application/json")
public Response setWorkerConfig(
final WorkerBehaviorConfig workerBehaviorConfig
)
{
if (!configManager.set(WorkerBehaviorConfig.CONFIG_KEY, workerBehaviorConfig)) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
log.info("Updating Worker configs: %s", workerBehaviorConfig);
return Response.ok().build();
}
@Deprecated
@GET
@Path("/worker/setup")
@Produces("application/json")
@ -177,6 +211,7 @@ public class OverlordResource
return Response.ok(workerSetupDataRef.get()).build();
}
@Deprecated
@POST
@Path("/worker/setup")
@Consumes("application/json")

View File

@ -0,0 +1,76 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
/**
*/
public class FillCapacityWithAffinityConfig
{
// key:Datasource, value:[nodeHostNames]
private Map<String, List<String>> affinity = Maps.newHashMap();
@JsonCreator
public FillCapacityWithAffinityConfig(
@JsonProperty("affinity") Map<String, List<String>> affinity
)
{
this.affinity = affinity;
}
@JsonProperty
public Map<String, List<String>> getAffinity()
{
return affinity;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FillCapacityWithAffinityConfig that = (FillCapacityWithAffinityConfig) o;
if (affinity != null
? !Maps.difference(affinity, that.affinity).entriesDiffering().isEmpty()
: that.affinity != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return affinity != null ? affinity.hashCode() : 0;
}
}

View File

@ -0,0 +1,125 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import java.util.List;
import java.util.Set;
/**
*/
public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
{
private final FillCapacityWithAffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet();
@JsonCreator
public FillCapacityWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") FillCapacityWithAffinityConfig affinityConfig
)
{
this.affinityConfig = affinityConfig;
for (List<String> affinityWorkers : affinityConfig.getAffinity().values()) {
for (String affinityWorker : affinityWorkers) {
this.affinityWorkerHosts.add(affinityWorker);
}
}
}
@JsonProperty
public FillCapacityWithAffinityConfig getAffinityConfig()
{
return affinityConfig;
}
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task
)
{
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
ImmutableMap.Builder<String, ImmutableZkWorker> builder = new ImmutableMap.Builder<>();
for (String workerHost : zkWorkers.keySet()) {
if (!affinityWorkerHosts.contains(workerHost)) {
builder.put(workerHost, zkWorkers.get(workerHost));
}
}
ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build();
List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
if (workerHosts == null) {
return super.findWorkerForTask(config, eligibleWorkers, task);
}
ImmutableMap.Builder<String, ImmutableZkWorker> affinityBuilder = new ImmutableMap.Builder<>();
for (String workerHost : workerHosts) {
ImmutableZkWorker zkWorker = zkWorkers.get(workerHost);
if (zkWorker != null) {
affinityBuilder.put(workerHost, zkWorker);
}
}
ImmutableMap<String, ImmutableZkWorker> affinityWorkers = affinityBuilder.build();
if (!affinityWorkers.isEmpty()) {
Optional<ImmutableZkWorker> retVal = super.findWorkerForTask(config, affinityWorkers, task);
if (retVal.isPresent()) {
return retVal;
}
}
return super.findWorkerForTask(config, eligibleWorkers, task);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FillCapacityWithAffinityWorkerSelectStrategy that = (FillCapacityWithAffinityWorkerSelectStrategy) o;
if (affinityConfig != null ? !affinityConfig.equals(that.affinityConfig) : that.affinityConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return affinityConfig != null ? affinityConfig.hashCode() : 0;
}
}

View File

@ -23,7 +23,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
@ -35,15 +34,9 @@ import java.util.TreeSet;
*/
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{
private final RemoteTaskRunnerConfig config;
@Inject
public FillCapacityWorkerSelectStrategy(RemoteTaskRunnerConfig config)
{
this.config = config;
}
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task
)

View File

@ -0,0 +1,95 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.overlord.autoscaling.AutoScaler;
/**
*/
public class WorkerBehaviorConfig
{
public static final String CONFIG_KEY = "worker.config";
private final WorkerSelectStrategy selectStrategy;
private final AutoScaler autoScaler;
@JsonCreator
public WorkerBehaviorConfig(
@JsonProperty("selectStrategy") WorkerSelectStrategy selectStrategy,
@JsonProperty("autoScaler") AutoScaler autoScaler
)
{
this.selectStrategy = selectStrategy;
this.autoScaler = autoScaler;
}
@JsonProperty
public WorkerSelectStrategy getSelectStrategy()
{
return selectStrategy;
}
@JsonProperty
public AutoScaler getAutoScaler()
{
return autoScaler;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerBehaviorConfig that = (WorkerBehaviorConfig) o;
if (autoScaler != null ? !autoScaler.equals(that.autoScaler) : that.autoScaler != null) {
return false;
}
if (selectStrategy != null ? !selectStrategy.equals(that.selectStrategy) : that.selectStrategy != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = selectStrategy != null ? selectStrategy.hashCode() : 0;
result = 31 * result + (autoScaler != null ? autoScaler.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "WorkerConfiguration{" +
"selectStrategy=" + selectStrategy +
", autoScaler=" + autoScaler +
'}';
}
}

View File

@ -25,28 +25,29 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ZkWorker;
import java.util.Map;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
/**
* The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FillCapacityWorkerSelectStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class)
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class)
})
public interface WorkerSelectStrategy
{
/**
* Customizable logic for selecting a worker to run a task.
*
* @param config A config for running remote tasks
* @param zkWorkers An immutable map of workers to choose from.
* @param task The task to assign.
*
* @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
*/
public Optional<ImmutableZkWorker> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task
);

View File

@ -21,9 +21,12 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
/**
*/
@Deprecated
public class WorkerSetupData
{
public static final String CONFIG_KEY = "worker.setup";

View File

@ -404,7 +404,7 @@ public class RemoteTaskRunnerTest
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
new FillCapacityWorkerSelectStrategy(config)
new FillCapacityWorkerSelectStrategy()
);
remoteTaskRunner.start();

View File

@ -21,15 +21,16 @@ package io.druid.indexing.overlord;
import com.google.common.base.Charsets;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.overlord.setup.EC2UserData;
import io.druid.indexing.overlord.setup.GalaxyEC2UserData;
import io.druid.indexing.overlord.setup.StringEC2UserData;
import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
import io.druid.indexing.overlord.autoscaling.ec2.GalaxyEC2UserData;
import io.druid.indexing.overlord.autoscaling.ec2.StringEC2UserData;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@Deprecated
public class WorkerSetupDataTest
{
@Test

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
@ -28,10 +28,10 @@ import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists;
import io.druid.common.guava.DSuppliers;
import io.druid.indexing.overlord.setup.EC2NodeData;
import io.druid.indexing.overlord.setup.GalaxyEC2UserData;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
import io.druid.indexing.overlord.autoscaling.ec2.EC2EnvironmentConfig;
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
import io.druid.indexing.overlord.autoscaling.ec2.GalaxyEC2UserData;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
import org.junit.After;
@ -41,11 +41,10 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class EC2AutoScalingStrategyTest
public class EC2AutoScalerTest
{
private static final String AMI_ID = "dummy";
private static final String INSTANCE_ID = "theInstance";
@ -56,8 +55,7 @@ public class EC2AutoScalingStrategyTest
private DescribeInstancesResult describeInstancesResult;
private Reservation reservation;
private Instance instance;
private EC2AutoScalingStrategy strategy;
private AtomicReference<WorkerSetupData> workerSetupData;
private SimpleResourceManagementConfig managementConfig;
@Before
public void setUp() throws Exception
@ -66,7 +64,6 @@ public class EC2AutoScalingStrategyTest
runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
reservation = EasyMock.createMock(Reservation.class);
workerSetupData = new AtomicReference<WorkerSetupData>(null);
instance = new Instance()
.withInstanceId(INSTANCE_ID)
@ -74,11 +71,7 @@ public class EC2AutoScalingStrategyTest
.withImageId(AMI_ID)
.withPrivateIpAddress(IP);
strategy = new EC2AutoScalingStrategy(
amazonEC2Client,
new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""),
DSuppliers.of(workerSetupData)
);
managementConfig = new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion("");
}
@After
@ -93,14 +86,16 @@ public class EC2AutoScalingStrategyTest
@Test
public void testScale()
{
workerSetupData.set(
new WorkerSetupData(
0,
1,
"",
EC2AutoScaler autoScaler = new EC2AutoScaler(
0,
1,
new EC2EnvironmentConfig(
"us-east-1a",
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type")
)
new GalaxyEC2UserData(new DefaultObjectMapper(), "env", "version", "type")
),
amazonEC2Client,
managementConfig
);
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
@ -121,12 +116,12 @@ public class EC2AutoScalingStrategyTest
EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce();
EasyMock.replay(reservation);
AutoScalingData created = strategy.provision();
AutoScalingData created = autoScaler.provision();
Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
AutoScalingData deleted = autoScaler.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(INSTANCE_ID, deleted.getNodeIds().get(0));

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.scaling;
package io.druid.indexing.overlord.autoscaling;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@ -32,7 +32,7 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
@ -55,20 +55,16 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class SimpleResourceManagementStrategyTest
{
private AutoScalingStrategy autoScalingStrategy;
private AutoScaler autoScaler;
private Task testTask;
private SimpleResourceManagementConfig simpleResourceManagementConfig;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
private AtomicReference<WorkerSetupData> workerSetupData;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
@Before
public void setUp() throws Exception
{
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<>(
new WorkerSetupData(
0, 2, null, null, null
)
);
autoScaler = EasyMock.createMock(AutoScaler.class);
testTask = new TestMergeTask(
"task1",
@ -88,27 +84,38 @@ public class SimpleResourceManagementStrategyTest
),
Lists.<AggregatorFactory>newArrayList()
);
simpleResourceManagementConfig = new SimpleResourceManagementConfig()
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion("");
workerConfig = new AtomicReference<>(
new WorkerBehaviorConfig(
null,
autoScaler
)
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
autoScalingStrategy,
new SimpleResourceManagementConfig()
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(""),
DSuppliers.of(workerSetupData)
simpleResourceManagementConfig,
DSuppliers.of(workerConfig)
);
}
@Test
public void testSuccessfulProvision() throws Exception
{
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode"))
);
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -125,18 +132,20 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
}
@Test
public void testSomethingProvisioning() throws Exception
{
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -172,7 +181,7 @@ public class SimpleResourceManagementStrategyTest
createdTime.equals(anotherCreatedTime)
);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
}
@Test
@ -184,14 +193,16 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expectLastCall();
EasyMock.replay(emitter);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.terminateWithIds(EasyMock.<List<String>>anyObject()))
EasyMock.expect(autoScaler.terminateWithIds(EasyMock.<List<String>>anyObject()))
.andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -229,21 +240,21 @@ public class SimpleResourceManagementStrategyTest
createdTime.equals(anotherCreatedTime)
);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
EasyMock.verify(emitter);
}
@Test
public void testDoSuccessfulTerminate() throws Exception
{
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList())
);
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -260,20 +271,20 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
}
@Test
public void testSomethingTerminating() throws Exception
{
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2);
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip"))
);
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -305,16 +316,18 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
}
@Test
public void testNoActionNeeded() throws Exception
{
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -327,12 +340,14 @@ public class SimpleResourceManagementStrategyTest
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -345,17 +360,19 @@ public class SimpleResourceManagementStrategyTest
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
}
@Test
public void testMinCountIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
@ -363,13 +380,15 @@ public class SimpleResourceManagementStrategyTest
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
@ -377,22 +396,22 @@ public class SimpleResourceManagementStrategyTest
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
EasyMock.reset(autoScaler);
// Increase minNumWorkers
workerSetupData.set(new WorkerSetupData(3, 5, null, null, null));
// Should provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
// Should provision two new workers
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
EasyMock.replay(autoScaler);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
@ -400,14 +419,14 @@ public class SimpleResourceManagementStrategyTest
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
}
@Test
public void testNullWorkerSetupData() throws Exception
public void testNullWorkerConfig() throws Exception
{
workerSetupData.set(null);
EasyMock.replay(autoScalingStrategy);
workerConfig.set(null);
EasyMock.replay(autoScaler);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
@ -430,7 +449,7 @@ public class SimpleResourceManagementStrategyTest
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(autoScaler);
}
private static class TestZkWorker extends ZkWorker

View File

@ -0,0 +1,117 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Arrays;
public class FillCapacityWithAffinityWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWithNulls() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@Test
public void testIsolation() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
);
Assert.assertFalse(optional.isPresent());
}
}

View File

@ -0,0 +1,87 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
import io.druid.indexing.overlord.autoscaling.ec2.EC2EnvironmentConfig;
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
import io.druid.indexing.overlord.autoscaling.ec2.StringEC2UserData;
import io.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Arrays;
public class WorkerBehaviorConfigTest
{
@Test
public void testSerde() throws Exception
{
WorkerBehaviorConfig config = new WorkerBehaviorConfig(
new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(
ImmutableMap.of("foo", Arrays.asList("localhost"))
)
),
new EC2AutoScaler(
7,
11,
new EC2EnvironmentConfig(
"us-east-1a",
new EC2NodeData(
"amiid",
"instanceType",
3,
5,
Arrays.asList("securityGroupIds"),
"keyNames"
),
new StringEC2UserData(
"availZone",
"replace",
"version"
)
),
null,
null
)
);
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
return null;
}
}
);
Assert.assertEquals(config, mapper.readValue(mapper.writeValueAsBytes(config), WorkerBehaviorConfig.class));
}
}

View File

@ -59,20 +59,16 @@ import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
import io.druid.indexing.overlord.http.OverlordResource;
import io.druid.indexing.overlord.scaling.AutoScalingStrategy;
import io.druid.indexing.overlord.scaling.EC2AutoScalingStrategy;
import io.druid.indexing.overlord.scaling.NoopAutoScalingStrategy;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.overlord.scaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -117,7 +113,9 @@ public class CliOverlord extends ServerRunnable
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant()
.annotatedWith(Names.named("serviceName"))
.to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
@ -202,19 +200,8 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.indexer.runner.workerSelectStrategy.type",
Key.get(WorkerSelectStrategy.class),
Key.get(FillCapacityWorkerSelectStrategy.class)
);
final MapBinder<String, WorkerSelectStrategy> stratBinder = PolyBind.optionBinder(
binder,
Key.get(WorkerSelectStrategy.class)
);
stratBinder.addBinding("fillCapacity").to(FillCapacityWorkerSelectStrategy.class);
binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
}
private void configureAutoscale(Binder binder)
@ -224,24 +211,6 @@ public class CliOverlord extends ServerRunnable
.to(SimpleResourceManagementStrategy.class)
.in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
PolyBind.createChoice(
binder,
"druid.indexer.autoscale.strategy",
Key.get(AutoScalingStrategy.class),
Key.get(NoopAutoScalingStrategy.class)
);
final MapBinder<String, AutoScalingStrategy> autoScalingBinder = PolyBind.optionBinder(
binder, Key.get(AutoScalingStrategy.class)
);
autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class);
binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class);
autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class);
binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
},