HttpRemoteTaskRunner: Fix markLazyWorkers for maxLazyWorkers == 0. (#14532)

This commit is contained in:
Gian Merlino 2023-07-06 11:51:04 -07:00 committed by GitHub
parent d02bb8bb6e
commit 037f09bef2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 6 deletions

View File

@ -1397,11 +1397,11 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers)
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxWorkers < 1) {
if (maxLazyWorkers < 1) {
return Collections.emptyList();
}
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
@ -1412,7 +1412,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker.getKey(), zkWorker);
if (lazyWorkers.size() == maxWorkers) {
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}

View File

@ -48,8 +48,11 @@ public interface WorkerTaskRunner extends TaskRunner
/**
* Check which workers can be marked as lazy
*
* @param isLazyWorker predicate that checks if a worker is lazy
* @param maxLazyWorkers maximum number of lazy workers to return
*/
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers);
WorkerTaskRunnerConfig getConfig();

View File

@ -96,6 +96,7 @@ import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -922,8 +923,14 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers)
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxLazyWorkers < 1) {
return Collections.emptyList();
}
synchronized (statusLock) {
for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
final WorkerHolder workerHolder = worker.getValue();
@ -931,7 +938,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
lazyWorkers.put(worker.getKey(), workerHolder);
if (lazyWorkers.size() == maxWorkers) {
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}

View File

@ -1105,6 +1105,17 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
Assert.assertEquals(
Collections.emptyList(),
taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 0)
);
Assert.assertEquals(
"host3:8080",
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 1))
.getHost()
);
Assert.assertEquals(
"host3:8080",
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))