RTR, HRTR: Fix incorrect maxLazyWorkers check in markLazyWorkers. (#14545)

Recently #14532 fixed a problem when maxLazyWorkers == 0 and lazyWorkers
starts out empty. Unfortunately, even after that patch, there remained
a more general version of this problem when maxLazyWorkers == lazyWorkers.size().
This patch fixes it.

I'm not sure if this would actually happen in production, because the
provisioning strategies do try to avoid calling markWorkersLazy until
previously-initiated terminations have finished. Nevertheless, it still
seems like a good thing to fix.
This commit is contained in:
Gian Merlino 2023-07-07 10:08:12 -07:00 committed by GitHub
parent 9e617373a0
commit 021a01df45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 18 deletions

View File

@ -1401,29 +1401,31 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{ {
// skip the lock and bail early if we should not mark any workers lazy (e.g. number // skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config) // of current workers is at or below the minNumWorkers of autoscaler config)
if (maxLazyWorkers < 1) { if (lazyWorkers.size() >= maxLazyWorkers) {
return Collections.emptyList(); return getLazyWorkers();
} }
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
// Search for new workers to mark lazy.
// Status lock is used to prevent any tasks being assigned to workers while we mark them lazy
synchronized (statusLock) { synchronized (statusLock) {
for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) { for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) {
if (lazyWorkers.size() >= maxLazyWorkers) {
break;
}
final ZkWorker zkWorker = worker.getValue(); final ZkWorker zkWorker = worker.getValue();
try { try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) { if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost()); log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker.getKey(), zkWorker); lazyWorkers.put(worker.getKey(), zkWorker);
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
} }
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
return getWorkerFromZK(lazyWorkers.values());
} }
return getLazyWorkers();
} }
protected List<String> getAssignedTasks(Worker worker) throws Exception protected List<String> getAssignedTasks(Worker worker) throws Exception

View File

@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.Worker;
import java.util.Collection; import java.util.Collection;
import java.util.List;
@PublicApi @PublicApi
public interface WorkerTaskRunner extends TaskRunner public interface WorkerTaskRunner extends TaskRunner
@ -47,10 +48,16 @@ public interface WorkerTaskRunner extends TaskRunner
Collection<Worker> getLazyWorkers(); Collection<Worker> getLazyWorkers();
/** /**
* Check which workers can be marked as lazy * Mark workers matching a predicate as lazy, up to a maximum. If the number of workers previously marked lazy is
* equal to or higher than the provided maximum, this method will return those previously marked workers and will
* not mark any additional workers. Workers are never un-marked lazy once they are marked lazy.
*
* This method is called by {@link org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy}
* implementations. It is expected that the lazy workers returned by this method will be terminated using
* {@link org.apache.druid.indexing.overlord.autoscaling.AutoScaler#terminate(List)}.
* *
* @param isLazyWorker predicate that checks if a worker is lazy * @param isLazyWorker predicate that checks if a worker is lazy
* @param maxLazyWorkers maximum number of lazy workers to return * @param maxLazyWorkers desired maximum number of lazy workers (actual number may be higher)
*/ */
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers); Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers);

View File

@ -96,7 +96,6 @@ import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -927,29 +926,31 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{ {
// skip the lock and bail early if we should not mark any workers lazy (e.g. number // skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config) // of current workers is at or below the minNumWorkers of autoscaler config)
if (maxLazyWorkers < 1) { if (lazyWorkers.size() >= maxLazyWorkers) {
return Collections.emptyList(); return getLazyWorkers();
} }
// Search for new workers to mark lazy.
// Status lock is used to prevent any tasks being assigned to workers while we mark them lazy
synchronized (statusLock) { synchronized (statusLock) {
for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) { for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
if (lazyWorkers.size() >= maxLazyWorkers) {
break;
}
final WorkerHolder workerHolder = worker.getValue(); final WorkerHolder workerHolder = worker.getValue();
try { try {
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) { if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost()); log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
lazyWorkers.put(worker.getKey(), workerHolder); lazyWorkers.put(worker.getKey(), workerHolder);
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
} }
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
return getLazyWorkers();
} }
return getLazyWorkers();
} }
private boolean isWorkerOkForMarkingLazy(Worker worker) private boolean isWorkerOkForMarkingLazy(Worker worker)