Autoscaling: Terminate obsolete workers faster

This commit is contained in:
Gian Merlino 2013-12-20 10:01:32 -08:00
parent 4a722c0a6d
commit 6224577ed1
1 changed files with 29 additions and 14 deletions

View File

@ -56,16 +56,6 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private final Object lock = new Object();
private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private final Predicate<ZkWorker> isLazyWorker = new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
}
};
private int targetWorkerCount = -1;
private DateTime lastProvisionTime = new DateTime();
@ -94,7 +84,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
log.warn("No workerSetupData available, cannot provision new workers.");
return false;
}
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(workerSetupData);
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
@ -156,7 +146,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
synchronized (lock) {
if (workerSetupDataRef.get() == null) {
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
if (workerSetupData == null) {
log.warn("No workerSetupData available, cannot terminate workers.");
return false;
}
@ -191,6 +182,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
updateTargetWorkerCount(pendingTasks, zkWorkers);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
if (currentlyTerminating.isEmpty()) {
final int want = zkWorkers.size() - targetWorkerCount;
if (want > 0) {
@ -250,7 +242,29 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats;
}
private Predicate<ZkWorker> createValidWorkerPredicate(final WorkerSetupData workerSetupData)
private static Predicate<ZkWorker> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
)
{
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
return new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker worker)
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
return worker.getRunningTasks().isEmpty() && (itHasBeenAWhile || !isValidWorker.apply(worker));
}
};
}
private static Predicate<ZkWorker> createValidWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
)
{
return new Predicate<ZkWorker>()
{
@ -277,8 +291,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(workerSetupData)
createValidWorkerPredicate(config, workerSetupData)
);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
if (targetWorkerCount < 0) {
// Initialize to size of current worker pool