From 6224577ed12141c1542ac5ccfc202be4862fd48c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 20 Dec 2013 10:01:32 -0800 Subject: [PATCH] Autoscaling: Terminate obsolete workers faster --- .../SimpleResourceManagementStrategy.java | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java index 4f541f113c7..80f7aef94c8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java @@ -56,16 +56,6 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private final Object lock = new Object(); private final Set currentlyProvisioning = Sets.newHashSet(); private final Set currentlyTerminating = Sets.newHashSet(); - private final Predicate isLazyWorker = new Predicate() - { - @Override - public boolean apply(ZkWorker input) - { - return input.getRunningTasks().isEmpty() - && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() - >= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); - } - }; private int targetWorkerCount = -1; private DateTime lastProvisionTime = new DateTime(); @@ -94,7 +84,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat log.warn("No workerSetupData available, cannot provision new workers."); return false; } - final Predicate isValidWorker = createValidWorkerPredicate(workerSetupData); + final Predicate isValidWorker = createValidWorkerPredicate(config, workerSetupData); final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); final List workerNodeIds = autoScalingStrategy.ipToIdLookup( @@ -156,7 +146,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat public boolean doTerminate(Collection pendingTasks, Collection zkWorkers) { synchronized (lock) { - if (workerSetupDataRef.get() == null) { + final WorkerSetupData workerSetupData = workerSetupDataRef.get(); + if (workerSetupData == null) { log.warn("No workerSetupData available, cannot terminate workers."); return false; } @@ -191,6 +182,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat updateTargetWorkerCount(pendingTasks, zkWorkers); + final Predicate isLazyWorker = createLazyWorkerPredicate(config, workerSetupData); if (currentlyTerminating.isEmpty()) { final int want = zkWorkers.size() - targetWorkerCount; if (want > 0) { @@ -250,7 +242,29 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat return scalingStats; } - private Predicate createValidWorkerPredicate(final WorkerSetupData workerSetupData) + private static Predicate createLazyWorkerPredicate( + final SimpleResourceManagementConfig config, + final WorkerSetupData workerSetupData + ) + { + final Predicate isValidWorker = createValidWorkerPredicate(config, workerSetupData); + + return new Predicate() + { + @Override + public boolean apply(ZkWorker worker) + { + final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis() + >= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); + return worker.getRunningTasks().isEmpty() && (itHasBeenAWhile || !isValidWorker.apply(worker)); + } + }; + } + + private static Predicate createValidWorkerPredicate( + final SimpleResourceManagementConfig config, + final WorkerSetupData workerSetupData + ) { return new Predicate() { @@ -277,8 +291,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat final WorkerSetupData workerSetupData = workerSetupDataRef.get(); final Collection validWorkers = Collections2.filter( zkWorkers, - createValidWorkerPredicate(workerSetupData) + createValidWorkerPredicate(config, workerSetupData) ); + final Predicate isLazyWorker = createLazyWorkerPredicate(config, workerSetupData); if (targetWorkerCount < 0) { // Initialize to size of current worker pool