mirror of https://github.com/apache/druid.git
Merge pull request #339 from metamx/autoscaling
Autoscaling: Move target count independent of actual count.
This commit is contained in:
commit
3ec2766cd3
|
@ -35,9 +35,9 @@ import org.joda.time.DateTime;
|
|||
public class NoopTask extends AbstractTask
|
||||
{
|
||||
private static final Logger log = new Logger(NoopTask.class);
|
||||
private static int defaultRunTime = 2500;
|
||||
private static int defaultIsReadyTime = 0;
|
||||
private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
|
||||
private static final int defaultRunTime = 2500;
|
||||
private static final int defaultIsReadyTime = 0;
|
||||
private static final IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
|
||||
|
||||
enum IsReadyResult
|
||||
{
|
||||
|
@ -139,4 +139,9 @@ public class NoopTask extends AbstractTask
|
|||
log.info("Woke up!");
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
||||
public static NoopTask create()
|
||||
{
|
||||
return new NoopTask(null, 0, 0, null, null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
@ -251,26 +252,26 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
@Override
|
||||
public Collection<ZkWorker> getWorkers()
|
||||
{
|
||||
return zkWorkers.values();
|
||||
return ImmutableList.copyOf(zkWorkers.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<RemoteTaskRunnerWorkItem> getRunningTasks()
|
||||
{
|
||||
return runningTasks.values();
|
||||
return ImmutableList.copyOf(runningTasks.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<RemoteTaskRunnerWorkItem> getPendingTasks()
|
||||
{
|
||||
return pendingTasks.values();
|
||||
return ImmutableList.copyOf(pendingTasks.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
|
||||
{
|
||||
// Racey, since there is a period of time during assignment when a task is neither pending nor running
|
||||
return Lists.newArrayList(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
|
||||
return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
|
||||
}
|
||||
|
||||
public ZkWorker findWorkerRunningTask(String taskId)
|
||||
|
|
|
@ -28,12 +28,10 @@ import java.util.List;
|
|||
public class AutoScalingData
|
||||
{
|
||||
private final List<String> nodeIds;
|
||||
private final List nodes;
|
||||
|
||||
public AutoScalingData(List<String> nodeIds, List nodes)
|
||||
public AutoScalingData(List<String> nodeIds)
|
||||
{
|
||||
this.nodeIds = nodeIds;
|
||||
this.nodes = nodes;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -42,17 +40,11 @@ public class AutoScalingData
|
|||
return nodeIds;
|
||||
}
|
||||
|
||||
public List getNodes()
|
||||
{
|
||||
return nodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "AutoScalingData{" +
|
||||
"nodeIds=" + nodeIds +
|
||||
", nodes=" + nodes +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,8 +125,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
|
|||
return input.getInstanceId();
|
||||
}
|
||||
}
|
||||
),
|
||||
result.getReservation().getInstances()
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -140,7 +139,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
|
|||
public AutoScalingData terminate(List<String> ips)
|
||||
{
|
||||
if (ips.isEmpty()) {
|
||||
return new AutoScalingData(Lists.<String>newArrayList(), Lists.<Instance>newArrayList());
|
||||
return new AutoScalingData(Lists.<String>newArrayList());
|
||||
}
|
||||
|
||||
DescribeInstancesResult result = amazonEC2Client.describeInstances(
|
||||
|
@ -184,8 +183,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
|
|||
return String.format("%s:%s", input, config.getWorkerPort());
|
||||
}
|
||||
}
|
||||
),
|
||||
instances
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -20,13 +20,16 @@
|
|||
package io.druid.indexing.overlord.scaling;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
||||
import io.druid.indexing.overlord.TaskRunnerWorkItem;
|
||||
|
@ -38,7 +41,6 @@ import org.joda.time.Duration;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -48,211 +50,190 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
|
||||
private final AutoScalingStrategy autoScalingStrategy;
|
||||
private final SimpleResourceManagementConfig config;
|
||||
private final Supplier<WorkerSetupData> workerSetupdDataRef;
|
||||
private final Supplier<WorkerSetupData> workerSetupDataRef;
|
||||
private final ScalingStats scalingStats;
|
||||
|
||||
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
|
||||
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
|
||||
private final Object lock = new Object();
|
||||
private final Set<String> currentlyProvisioning = Sets.newHashSet();
|
||||
private final Set<String> currentlyTerminating = Sets.newHashSet();
|
||||
|
||||
private volatile DateTime lastProvisionTime = new DateTime();
|
||||
private volatile DateTime lastTerminateTime = new DateTime();
|
||||
private int targetWorkerCount = -1;
|
||||
private DateTime lastProvisionTime = new DateTime();
|
||||
private DateTime lastTerminateTime = new DateTime();
|
||||
|
||||
@Inject
|
||||
public SimpleResourceManagementStrategy(
|
||||
AutoScalingStrategy autoScalingStrategy,
|
||||
SimpleResourceManagementConfig config,
|
||||
Supplier<WorkerSetupData> workerSetupdDataRef
|
||||
Supplier<WorkerSetupData> workerSetupDataRef
|
||||
)
|
||||
{
|
||||
this.autoScalingStrategy = autoScalingStrategy;
|
||||
this.config = config;
|
||||
this.workerSetupdDataRef = workerSetupdDataRef;
|
||||
this.workerSetupDataRef = workerSetupDataRef;
|
||||
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
|
||||
{
|
||||
final WorkerSetupData workerSetupData = workerSetupdDataRef.get();
|
||||
|
||||
final String minVersion = workerSetupData.getMinVersion() == null
|
||||
? config.getWorkerVersion()
|
||||
: workerSetupData.getMinVersion();
|
||||
int maxNumWorkers = workerSetupData.getMaxNumWorkers();
|
||||
|
||||
int currValidWorkers = 0;
|
||||
for (ZkWorker zkWorker : zkWorkers) {
|
||||
if (zkWorker.isValidVersion(minVersion)) {
|
||||
currValidWorkers++;
|
||||
synchronized (lock) {
|
||||
boolean didProvision = false;
|
||||
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
|
||||
if (workerSetupData == null) {
|
||||
log.warn("No workerSetupData available, cannot provision new workers.");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
|
||||
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
|
||||
|
||||
if (currValidWorkers >= maxNumWorkers) {
|
||||
log.debug(
|
||||
"Cannot scale anymore. Num workers = %d, Max num workers = %d",
|
||||
zkWorkers.size(),
|
||||
workerSetupdDataRef.get().getMaxNumWorkers()
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
|
||||
Lists.newArrayList(
|
||||
Iterables.<ZkWorker, String>transform(
|
||||
zkWorkers,
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(ZkWorker input)
|
||||
final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
|
||||
Lists.newArrayList(
|
||||
Iterables.<ZkWorker, String>transform(
|
||||
zkWorkers,
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
return input.getWorker().getIp();
|
||||
@Override
|
||||
public String apply(ZkWorker input)
|
||||
{
|
||||
return input.getWorker().getIp();
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
)
|
||||
)
|
||||
);
|
||||
currentlyProvisioning.removeAll(workerNodeIds);
|
||||
|
||||
currentlyProvisioning.removeAll(workerNodeIds);
|
||||
boolean nothingProvisioning = currentlyProvisioning.isEmpty();
|
||||
updateTargetWorkerCount(pendingTasks, zkWorkers);
|
||||
|
||||
if (nothingProvisioning) {
|
||||
if (hasTaskPendingBeyondThreshold(pendingTasks)) {
|
||||
AutoScalingData provisioned = autoScalingStrategy.provision();
|
||||
|
||||
if (provisioned != null) {
|
||||
currentlyProvisioning.addAll(provisioned.getNodeIds());
|
||||
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
|
||||
while (want > 0) {
|
||||
final AutoScalingData provisioned = autoScalingStrategy.provision();
|
||||
final List<String> newNodes;
|
||||
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
|
||||
break;
|
||||
} else {
|
||||
currentlyProvisioning.addAll(newNodes);
|
||||
lastProvisionTime = new DateTime();
|
||||
scalingStats.addProvisionEvent(provisioned);
|
||||
|
||||
return true;
|
||||
want -= provisioned.getNodeIds().size();
|
||||
didProvision = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
|
||||
|
||||
log.info(
|
||||
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s",
|
||||
currentlyProvisioning,
|
||||
durSinceLastProvision
|
||||
);
|
||||
if (!currentlyProvisioning.isEmpty()) {
|
||||
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
|
||||
|
||||
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
|
||||
log.makeAlert("Worker node provisioning taking too long!")
|
||||
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
|
||||
.addData("provisioningCount", currentlyProvisioning.size())
|
||||
.emit();
|
||||
log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
|
||||
|
||||
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
|
||||
autoScalingStrategy.terminate(nodeIps);
|
||||
currentlyProvisioning.clear();
|
||||
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
|
||||
log.makeAlert("Worker node provisioning taking too long!")
|
||||
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
|
||||
.addData("provisioningCount", currentlyProvisioning.size())
|
||||
.emit();
|
||||
|
||||
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
|
||||
autoScalingStrategy.terminate(nodeIps);
|
||||
currentlyProvisioning.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
return didProvision;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
|
||||
{
|
||||
Set<String> workerNodeIds = Sets.newHashSet(
|
||||
autoScalingStrategy.ipToIdLookup(
|
||||
Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
zkWorkers,
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(ZkWorker input)
|
||||
synchronized (lock) {
|
||||
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
|
||||
if (workerSetupData == null) {
|
||||
log.warn("No workerSetupData available, cannot terminate workers.");
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean didTerminate = false;
|
||||
final Set<String> workerNodeIds = Sets.newHashSet(
|
||||
autoScalingStrategy.ipToIdLookup(
|
||||
Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
zkWorkers,
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
return input.getWorker().getIp();
|
||||
@Override
|
||||
public String apply(ZkWorker input)
|
||||
{
|
||||
return input.getWorker().getIp();
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Set<String> stillExisting = Sets.newHashSet();
|
||||
for (String s : currentlyTerminating) {
|
||||
if (workerNodeIds.contains(s)) {
|
||||
stillExisting.add(s);
|
||||
}
|
||||
}
|
||||
currentlyTerminating.clear();
|
||||
currentlyTerminating.addAll(stillExisting);
|
||||
boolean nothingTerminating = currentlyTerminating.isEmpty();
|
||||
|
||||
if (nothingTerminating) {
|
||||
final int minNumWorkers = workerSetupdDataRef.get().getMinNumWorkers();
|
||||
if (zkWorkers.size() <= minNumWorkers) {
|
||||
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
|
||||
return false;
|
||||
}
|
||||
|
||||
List<ZkWorker> thoseLazyWorkers = Lists.newArrayList(
|
||||
FunctionalIterable
|
||||
.create(zkWorkers)
|
||||
.filter(
|
||||
new Predicate<ZkWorker>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(ZkWorker input)
|
||||
{
|
||||
return input.getRunningTasks().isEmpty()
|
||||
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
|
||||
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
int maxPossibleNodesTerminated = zkWorkers.size() - minNumWorkers;
|
||||
int numNodesToTerminate = Math.min(maxPossibleNodesTerminated, thoseLazyWorkers.size());
|
||||
if (numNodesToTerminate <= 0) {
|
||||
log.info("Found no nodes to terminate.");
|
||||
return false;
|
||||
}
|
||||
|
||||
AutoScalingData terminated = autoScalingStrategy.terminate(
|
||||
Lists.transform(
|
||||
thoseLazyWorkers.subList(0, numNodesToTerminate),
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(ZkWorker input)
|
||||
{
|
||||
return input.getWorker().getIp();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
if (terminated != null) {
|
||||
currentlyTerminating.addAll(terminated.getNodeIds());
|
||||
lastTerminateTime = new DateTime();
|
||||
scalingStats.addTerminateEvent(terminated);
|
||||
|
||||
return true;
|
||||
final Set<String> stillExisting = Sets.newHashSet();
|
||||
for (String s : currentlyTerminating) {
|
||||
if (workerNodeIds.contains(s)) {
|
||||
stillExisting.add(s);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
|
||||
currentlyTerminating.clear();
|
||||
currentlyTerminating.addAll(stillExisting);
|
||||
|
||||
log.info(
|
||||
"%s still terminating. Wait for all nodes to terminate before trying again.",
|
||||
currentlyTerminating
|
||||
);
|
||||
updateTargetWorkerCount(pendingTasks, zkWorkers);
|
||||
|
||||
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
|
||||
log.makeAlert("Worker node termination taking too long!")
|
||||
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
|
||||
.addData("terminatingCount", currentlyTerminating.size())
|
||||
.emit();
|
||||
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
|
||||
if (currentlyTerminating.isEmpty()) {
|
||||
final int want = zkWorkers.size() - targetWorkerCount;
|
||||
if (want > 0) {
|
||||
final List<String> laziestWorkerIps =
|
||||
FluentIterable.from(zkWorkers)
|
||||
.filter(isLazyWorker)
|
||||
.limit(want)
|
||||
.transform(
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(ZkWorker zkWorker)
|
||||
{
|
||||
return zkWorker.getWorker().getIp();
|
||||
}
|
||||
}
|
||||
)
|
||||
.toList();
|
||||
|
||||
currentlyTerminating.clear();
|
||||
log.info(
|
||||
"Terminating %,d workers (wanted %,d): %s",
|
||||
laziestWorkerIps.size(),
|
||||
want,
|
||||
Joiner.on(", ").join(laziestWorkerIps)
|
||||
);
|
||||
|
||||
final AutoScalingData terminated = autoScalingStrategy.terminate(laziestWorkerIps);
|
||||
if (terminated != null) {
|
||||
currentlyTerminating.addAll(terminated.getNodeIds());
|
||||
lastTerminateTime = new DateTime();
|
||||
scalingStats.addTerminateEvent(terminated);
|
||||
didTerminate = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
|
||||
|
||||
log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate);
|
||||
|
||||
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
|
||||
log.makeAlert("Worker node termination taking too long!")
|
||||
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
|
||||
.addData("terminatingCount", currentlyTerminating.size())
|
||||
.emit();
|
||||
|
||||
currentlyTerminating.clear();
|
||||
}
|
||||
}
|
||||
|
||||
return didTerminate;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -261,16 +242,122 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
return scalingStats;
|
||||
}
|
||||
|
||||
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
|
||||
private static Predicate<ZkWorker> createLazyWorkerPredicate(
|
||||
final SimpleResourceManagementConfig config,
|
||||
final WorkerSetupData workerSetupData
|
||||
)
|
||||
{
|
||||
long now = System.currentTimeMillis();
|
||||
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
|
||||
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
|
||||
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
|
||||
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
|
||||
return true;
|
||||
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
|
||||
|
||||
return new Predicate<ZkWorker>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(ZkWorker worker)
|
||||
{
|
||||
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
|
||||
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
|
||||
return worker.getRunningTasks().isEmpty() && (itHasBeenAWhile || !isValidWorker.apply(worker));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static Predicate<ZkWorker> createValidWorkerPredicate(
|
||||
final SimpleResourceManagementConfig config,
|
||||
final WorkerSetupData workerSetupData
|
||||
)
|
||||
{
|
||||
return new Predicate<ZkWorker>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(ZkWorker zkWorker)
|
||||
{
|
||||
final String minVersion = workerSetupData.getMinVersion() != null
|
||||
? workerSetupData.getMinVersion()
|
||||
: config.getWorkerVersion();
|
||||
if (minVersion == null) {
|
||||
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
|
||||
}
|
||||
return zkWorker.isValidVersion(minVersion);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void updateTargetWorkerCount(
|
||||
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
|
||||
final Collection<ZkWorker> zkWorkers
|
||||
)
|
||||
{
|
||||
synchronized (lock) {
|
||||
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
|
||||
final Collection<ZkWorker> validWorkers = Collections2.filter(
|
||||
zkWorkers,
|
||||
createValidWorkerPredicate(config, workerSetupData)
|
||||
);
|
||||
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
|
||||
|
||||
if (targetWorkerCount < 0) {
|
||||
// Initialize to size of current worker pool
|
||||
targetWorkerCount = zkWorkers.size();
|
||||
log.info(
|
||||
"Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).",
|
||||
targetWorkerCount,
|
||||
validWorkers.size(),
|
||||
workerSetupData.getMinNumWorkers(),
|
||||
workerSetupData.getMaxNumWorkers()
|
||||
);
|
||||
}
|
||||
|
||||
final boolean atSteadyState = currentlyProvisioning.isEmpty()
|
||||
&& currentlyTerminating.isEmpty()
|
||||
&& validWorkers.size() == targetWorkerCount;
|
||||
final boolean shouldScaleUp = atSteadyState
|
||||
&& hasTaskPendingBeyondThreshold(pendingTasks)
|
||||
&& targetWorkerCount < workerSetupData.getMaxNumWorkers();
|
||||
final boolean shouldScaleDown = atSteadyState
|
||||
&& Iterables.any(validWorkers, isLazyWorker)
|
||||
&& targetWorkerCount > workerSetupData.getMinNumWorkers();
|
||||
if (shouldScaleUp) {
|
||||
targetWorkerCount++;
|
||||
log.info(
|
||||
"I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).",
|
||||
targetWorkerCount,
|
||||
validWorkers.size(),
|
||||
workerSetupData.getMinNumWorkers(),
|
||||
workerSetupData.getMaxNumWorkers()
|
||||
);
|
||||
} else if (shouldScaleDown) {
|
||||
targetWorkerCount--;
|
||||
log.info(
|
||||
"I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).",
|
||||
targetWorkerCount,
|
||||
validWorkers.size(),
|
||||
workerSetupData.getMinNumWorkers(),
|
||||
workerSetupData.getMaxNumWorkers()
|
||||
);
|
||||
} else {
|
||||
log.info(
|
||||
"Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).",
|
||||
targetWorkerCount,
|
||||
validWorkers.size(),
|
||||
workerSetupData.getMinNumWorkers(),
|
||||
workerSetupData.getMaxNumWorkers()
|
||||
);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
|
||||
{
|
||||
synchronized (lock) {
|
||||
long now = System.currentTimeMillis();
|
||||
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
|
||||
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
|
||||
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
|
||||
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,13 +126,11 @@ public class EC2AutoScalingStrategyTest
|
|||
AutoScalingData created = strategy.provision();
|
||||
|
||||
Assert.assertEquals(created.getNodeIds().size(), 1);
|
||||
Assert.assertEquals(created.getNodes().size(), 1);
|
||||
Assert.assertEquals("theInstance", created.getNodeIds().get(0));
|
||||
|
||||
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
|
||||
|
||||
Assert.assertEquals(deleted.getNodeIds().size(), 1);
|
||||
Assert.assertEquals(deleted.getNodes().size(), 1);
|
||||
Assert.assertEquals(String.format("%s:8080", IP), deleted.getNodeIds().get(0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.indexing.overlord.scaling;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -28,6 +29,7 @@ import com.metamx.emitter.service.ServiceEventBuilder;
|
|||
import io.druid.common.guava.DSuppliers;
|
||||
import io.druid.indexing.common.TestMergeTask;
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.task.NoopTask;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
||||
import io.druid.indexing.overlord.ZkWorker;
|
||||
|
@ -63,7 +65,7 @@ public class SimpleResourceManagementStrategyTest
|
|||
public void setUp() throws Exception
|
||||
{
|
||||
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
|
||||
workerSetupData = new AtomicReference<WorkerSetupData>(
|
||||
workerSetupData = new AtomicReference<>(
|
||||
new WorkerSetupData(
|
||||
"0", 0, 2, null, null, null
|
||||
)
|
||||
|
@ -105,7 +107,7 @@ public class SimpleResourceManagementStrategyTest
|
|||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList());
|
||||
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList())
|
||||
new AutoScalingData(Lists.<String>newArrayList("aNode"))
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
|
@ -133,7 +135,7 @@ public class SimpleResourceManagementStrategyTest
|
|||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList()).times(2);
|
||||
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
|
||||
new AutoScalingData(Lists.<String>newArrayList("fake"))
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
|
@ -190,7 +192,7 @@ public class SimpleResourceManagementStrategyTest
|
|||
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(null);
|
||||
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
|
||||
new AutoScalingData(Lists.<String>newArrayList("fake"))
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
|
@ -242,7 +244,7 @@ public class SimpleResourceManagementStrategyTest
|
|||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList());
|
||||
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList())
|
||||
new AutoScalingData(Lists.<String>newArrayList())
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
|
@ -272,7 +274,7 @@ public class SimpleResourceManagementStrategyTest
|
|||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList("ip")).times(2);
|
||||
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList("ip"), Lists.newArrayList("ip"))
|
||||
new AutoScalingData(Lists.<String>newArrayList("ip"))
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
|
@ -309,15 +311,174 @@ public class SimpleResourceManagementStrategyTest
|
|||
EasyMock.verify(autoScalingStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoActionNeeded() throws Exception
|
||||
{
|
||||
EasyMock.reset(autoScalingStrategy);
|
||||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList("ip"));
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(
|
||||
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
|
||||
),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(NoopTask.create()),
|
||||
new TestZkWorker(NoopTask.create())
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertFalse(terminatedSomething);
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
|
||||
EasyMock.reset(autoScalingStrategy);
|
||||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList("ip"));
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(
|
||||
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
|
||||
),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(NoopTask.create()),
|
||||
new TestZkWorker(NoopTask.create())
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertFalse(provisionedSomething);
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinVersionIncrease() throws Exception
|
||||
{
|
||||
// Don't terminate anything
|
||||
EasyMock.reset(autoScalingStrategy);
|
||||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList("ip"));
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
|
||||
new TestZkWorker(NoopTask.create(), "h1", "i2", "0")
|
||||
)
|
||||
);
|
||||
Assert.assertFalse(terminatedSomething);
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
|
||||
// Don't provision anything
|
||||
EasyMock.reset(autoScalingStrategy);
|
||||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList("ip"));
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(NoopTask.create()),
|
||||
new TestZkWorker(NoopTask.create())
|
||||
)
|
||||
);
|
||||
Assert.assertFalse(provisionedSomething);
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
|
||||
// Increase minVersion
|
||||
workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null));
|
||||
|
||||
// Provision two new workers
|
||||
EasyMock.reset(autoScalingStrategy);
|
||||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
|
||||
.andReturn(Lists.<String>newArrayList("ip"));
|
||||
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList("h3"))
|
||||
);
|
||||
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
|
||||
new AutoScalingData(Lists.<String>newArrayList("h4"))
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
|
||||
new TestZkWorker(NoopTask.create(), "h2", "i2", "0")
|
||||
)
|
||||
);
|
||||
Assert.assertTrue(provisionedSomething);
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
|
||||
// Terminate old workers
|
||||
EasyMock.reset(autoScalingStrategy);
|
||||
EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn(
|
||||
ImmutableList.of("h1", "h2", "h3", "h4")
|
||||
);
|
||||
EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn(
|
||||
new AutoScalingData(ImmutableList.of("h1", "h2"))
|
||||
);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(null, "h1", "i1", "0"),
|
||||
new TestZkWorker(null, "h2", "i2", "0"),
|
||||
new TestZkWorker(NoopTask.create(), "h3", "i3", "1"),
|
||||
new TestZkWorker(NoopTask.create(), "h4", "i4", "1")
|
||||
)
|
||||
);
|
||||
Assert.assertTrue(terminatedSomething);
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullWorkerSetupData() throws Exception
|
||||
{
|
||||
workerSetupData.set(null);
|
||||
EasyMock.replay(autoScalingStrategy);
|
||||
|
||||
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(
|
||||
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
|
||||
),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(null)
|
||||
)
|
||||
);
|
||||
|
||||
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
|
||||
Arrays.<RemoteTaskRunnerWorkItem>asList(
|
||||
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
|
||||
),
|
||||
Arrays.<ZkWorker>asList(
|
||||
new TestZkWorker(null)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertFalse(terminatedSomething);
|
||||
Assert.assertFalse(provisionedSomething);
|
||||
|
||||
EasyMock.verify(autoScalingStrategy);
|
||||
}
|
||||
|
||||
private static class TestZkWorker extends ZkWorker
|
||||
{
|
||||
private final Task testTask;
|
||||
|
||||
private TestZkWorker(
|
||||
public TestZkWorker(
|
||||
Task testTask
|
||||
)
|
||||
{
|
||||
super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper());
|
||||
this(testTask, "host", "ip", "0");
|
||||
}
|
||||
|
||||
public TestZkWorker(
|
||||
Task testTask,
|
||||
String host,
|
||||
String ip,
|
||||
String version
|
||||
)
|
||||
{
|
||||
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper());
|
||||
|
||||
this.testTask = testTask;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue