mirror of https://github.com/apache/druid.git
Autoscaling fixes.
- Initial targetWorkerCount must be subject to pool size limits - Use consistent workerSetupData for the entire autoscaling run - Don't call terminate() when we have nothing to terminate - Terminate obsolete workers even faster
This commit is contained in:
parent
c503f5e9c5
commit
e5b8546d19
|
@ -104,7 +104,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
);
|
||||
currentlyProvisioning.removeAll(workerNodeIds);
|
||||
|
||||
updateTargetWorkerCount(pendingTasks, zkWorkers);
|
||||
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
|
||||
|
||||
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
|
||||
while (want > 0) {
|
||||
|
@ -180,16 +180,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
currentlyTerminating.clear();
|
||||
currentlyTerminating.addAll(stillExisting);
|
||||
|
||||
updateTargetWorkerCount(pendingTasks, zkWorkers);
|
||||
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
|
||||
|
||||
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
|
||||
if (currentlyTerminating.isEmpty()) {
|
||||
final int want = zkWorkers.size() - targetWorkerCount;
|
||||
if (want > 0) {
|
||||
final int excessWorkers = (zkWorkers.size() + currentlyProvisioning.size()) - targetWorkerCount;
|
||||
if (excessWorkers > 0) {
|
||||
final List<String> laziestWorkerIps =
|
||||
FluentIterable.from(zkWorkers)
|
||||
.filter(isLazyWorker)
|
||||
.limit(want)
|
||||
.limit(excessWorkers)
|
||||
.transform(
|
||||
new Function<ZkWorker, String>()
|
||||
{
|
||||
|
@ -202,10 +202,13 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
)
|
||||
.toList();
|
||||
|
||||
if (laziestWorkerIps.isEmpty()) {
|
||||
log.info("Wanted to terminate %,d workers, but couldn't find any lazy ones!");
|
||||
} else {
|
||||
log.info(
|
||||
"Terminating %,d workers (wanted %,d): %s",
|
||||
laziestWorkerIps.size(),
|
||||
want,
|
||||
excessWorkers,
|
||||
Joiner.on(", ").join(laziestWorkerIps)
|
||||
);
|
||||
|
||||
|
@ -217,6 +220,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
didTerminate = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
|
||||
|
||||
|
@ -283,12 +287,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
}
|
||||
|
||||
private void updateTargetWorkerCount(
|
||||
final WorkerSetupData workerSetupData,
|
||||
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
|
||||
final Collection<ZkWorker> zkWorkers
|
||||
)
|
||||
{
|
||||
synchronized (lock) {
|
||||
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
|
||||
final Collection<ZkWorker> validWorkers = Collections2.filter(
|
||||
zkWorkers,
|
||||
createValidWorkerPredicate(config, workerSetupData)
|
||||
|
@ -296,8 +300,14 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
|
||||
|
||||
if (targetWorkerCount < 0) {
|
||||
// Initialize to size of current worker pool
|
||||
targetWorkerCount = zkWorkers.size();
|
||||
// Initialize to size of current worker pool, subject to pool size limits
|
||||
targetWorkerCount = Math.max(
|
||||
Math.min(
|
||||
zkWorkers.size(),
|
||||
workerSetupData.getMaxNumWorkers()
|
||||
),
|
||||
workerSetupData.getMinNumWorkers()
|
||||
);
|
||||
log.info(
|
||||
"Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).",
|
||||
targetWorkerCount,
|
||||
|
|
Loading…
Reference in New Issue