More efficient generation of ImmutableWorkerHolder from WorkerHolder. (#14546)

* More efficient generation of ImmutableWorkerHolder from WorkerHolder.

Taking the work done in #12096 a little further:

1) Applying a similar optimization to WorkerHolder (HttpRemoteTaskRunner).
   The original patch only helped with the ZkWorker (RemoteTaskRunner).

2) Improve the ZkWorker version somewhat by avoiding multiple iterations
   through the task announcements map.

* Pick better names and use better logic.

* Only runnable tasks.

* Fix test.

* Fix testBlacklistZKWorkers50Percent.
This commit is contained in:
Gian Merlino 2023-07-13 07:57:16 -07:00 committed by GitHub
parent 7650a71d37
commit 450ecd6370
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 83 deletions

View File

@ -20,16 +20,20 @@
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
@ -44,6 +48,8 @@ public class ImmutableWorkerInfo
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;
@Nullable
private final DateTime blacklistedUntil;
@JsonCreator
@ -76,7 +82,8 @@ public class ImmutableWorkerInfo
)
{
this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups,
runningTasks, lastCompletedTaskTime, null);
runningTasks, lastCompletedTaskTime, null
);
}
public ImmutableWorkerInfo(
@ -90,6 +97,51 @@ public class ImmutableWorkerInfo
this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
}
/**
* Helper used by {@link ZkWorker} and {@link org.apache.druid.indexing.overlord.hrtr.WorkerHolder}.
*/
public static ImmutableWorkerInfo fromWorkerAnnouncements(
final Worker worker,
final Map<String, TaskAnnouncement> announcements,
final DateTime lastCompletedTaskTime,
@Nullable final DateTime blacklistedUntil
)
{
int currCapacityUsed = 0;
int currParallelIndexCapacityUsed = 0;
ImmutableSet.Builder<String> taskIds = ImmutableSet.builder();
ImmutableSet.Builder<String> availabilityGroups = ImmutableSet.builder();
for (final Map.Entry<String, TaskAnnouncement> entry : announcements.entrySet()) {
final TaskAnnouncement announcement = entry.getValue();
if (announcement.getStatus().isRunnable()) {
final String taskId = entry.getKey();
final TaskResource taskResource = announcement.getTaskResource();
final int requiredCapacity = taskResource.getRequiredCapacity();
currCapacityUsed += requiredCapacity;
if (ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) {
currParallelIndexCapacityUsed += requiredCapacity;
}
taskIds.add(taskId);
availabilityGroups.add(taskResource.getAvailabilityGroup());
}
}
return new ImmutableWorkerInfo(
worker,
currCapacityUsed,
currParallelIndexCapacityUsed,
availabilityGroups.build(),
taskIds.build(),
lastCompletedTaskTime,
blacklistedUntil
);
}
@JsonProperty("worker")
public Worker getWorker()
{
@ -132,6 +184,7 @@ public class ImmutableWorkerInfo
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public DateTime getBlacklistedUntil()
{
return blacklistedUntil;

View File

@ -523,6 +523,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return Optional.fromNullable(provisioningService.getStats());
}
@Nullable
public ZkWorker findWorkerRunningTask(String taskId)
{
for (ZkWorker zkWorker : zkWorkers.values()) {
@ -533,6 +534,15 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return null;
}
/**
* Retrieve {@link ZkWorker} based on an ID (host), or null if the ID doesn't exist.
*/
@Nullable
ZkWorker findWorkerId(String workerId)
{
return zkWorkers.get(workerId);
}
public boolean isWorkerRunningTask(ZkWorker worker, String taskId)
{
return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId);

View File

@ -229,14 +229,9 @@ public class ZkWorker implements Closeable
public ImmutableWorkerInfo toImmutable()
{
Map<String, TaskAnnouncement> tasks = getRunningTasks();
return new ImmutableWorkerInfo(
return ImmutableWorkerInfo.fromWorkerAnnouncements(
worker.get(),
getCurrCapacityUsed(tasks),
getCurrParallelIndexCapacityUsed(tasks),
getAvailabilityGroups(tasks),
tasks.keySet(),
getRunningTasks(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);

View File

@ -49,6 +49,7 @@ import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -494,9 +495,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
),
Sets.union(
immutableWorker.getRunningTasks(),
Sets.newHashSet(
task.getId()
)
Collections.singleton(task.getId())
),
DateTimes.nowUtc()
);

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
@ -47,10 +46,8 @@ import org.joda.time.DateTime;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -58,7 +55,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
*/
@ -133,42 +129,6 @@ public class WorkerHolder
return worker;
}
private Map<String, TaskAnnouncement> getRunningTasks()
{
return tasksSnapshotRef.get().entrySet().stream().filter(
e -> e.getValue().getTaskStatus().isRunnable()
).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
private int getCurrCapacityUsed()
{
int currCapacity = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
return currCapacity;
}
private int getCurrParallelIndexCapcityUsed()
{
int currParallelIndexCapacityUsed = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
}
return currParallelIndexCapacityUsed;
}
private Set<String> getAvailabilityGroups()
{
Set<String> retVal = new HashSet<>();
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
}
return retVal;
}
public DateTime getBlacklistedUntil()
{
return blacklistedUntil.get();
@ -201,12 +161,9 @@ public class WorkerHolder
w = disabledWorker;
}
return new ImmutableWorkerInfo(
return ImmutableWorkerInfo.fromWorkerAnnouncements(
w,
getCurrCapacityUsed(),
getCurrParallelIndexCapcityUsed(),
getAvailabilityGroups(),
getRunningTasks().keySet(),
tasksSnapshotRef.get(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);

View File

@ -126,7 +126,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
tasks[5] = TestTasks.unending("task5");
results[5] = remoteTaskRunner.run(tasks[5]);
waitForOneWorkerToHaveUnackedTasks();
if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) {
if (rtrTestUtils.taskAssigned("worker0", tasks[5].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]);
} else {
@ -138,7 +138,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2) throws Exception
{
if (rtrTestUtils.taskAnnounced("worker0", t1.getId())) {
if (rtrTestUtils.taskAssigned("worker0", t1.getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", t1);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1);
rtrTestUtils.mockWorkerRunningTask("worker1", t2);

View File

@ -101,7 +101,8 @@ public class RemoteTaskRunnerTest
private Worker worker;
@Rule
public TestRule watcher = new TestWatcher() {
public TestRule watcher = new TestWatcher()
{
@Override
protected void starting(Description description)
{
@ -621,7 +622,7 @@ public class RemoteTaskRunnerTest
private boolean taskAnnounced(final String taskId)
{
return rtrTestUtils.taskAnnounced(WORKER_HOST, taskId);
return rtrTestUtils.taskAssigned(WORKER_HOST, taskId);
}
private boolean workerRunningTask(final String taskId)
@ -890,8 +891,8 @@ public class RemoteTaskRunnerTest
}
/**
* With 2 workers and maxPercentageBlacklistWorkers(25), neither worker should ever be blacklisted even after
* exceeding maxRetriesBeforeBlacklist.
* With 2 workers and maxPercentageBlacklistWorkers(25), no worker should be blacklisted even after exceeding
* maxRetriesBeforeBlacklist.
*/
@Test
public void testBlacklistZKWorkers25Percent() throws Exception
@ -904,8 +905,7 @@ public class RemoteTaskRunnerTest
makeRemoteTaskRunner(rtrConfig);
String firstWorker = null;
String secondWorker = null;
String assignedWorker = null;
for (int i = 1; i < 13; i++) {
String taskId = StringUtils.format("rt-%d", i);
@ -920,26 +920,23 @@ public class RemoteTaskRunnerTest
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
assignedWorker = "worker2";
} else {
firstWorker = "worker";
secondWorker = "worker2";
assignedWorker = "worker";
}
}
final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;
Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(rtrTestUtils.taskAssigned(assignedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(assignedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(assignedWorker, task);
Assert.assertTrue(taskFuture.get().isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
i,
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+ remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
);
}
}
@ -975,7 +972,7 @@ public class RemoteTaskRunnerTest
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
} else {
@ -984,17 +981,26 @@ public class RemoteTaskRunnerTest
}
}
final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker;
final String expectedWorker = i > 2 ? secondWorker : firstWorker;
Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
Assert.assertTrue(
StringUtils.format("Task[%s] assigned to worker[%s]", i, expectedWorker),
rtrTestUtils.taskAssigned(expectedWorker, task.getId())
);
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(taskFuture.get().isFailure());
Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
i > 4 ? i - 2 : ((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
StringUtils.format("Blacklisted workers after task[%s]", i),
i >= 2 ? 1 : 0,
remoteTaskRunner.getBlackListedWorkers().size()
);
Assert.assertEquals(
StringUtils.format("Continuously failed tasks after task[%s]", i),
i,
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+ remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
);
}
}

View File

@ -219,7 +219,7 @@ public class RemoteTaskRunnerTestUtils
return pathExists(JOINER.join(STATUS_PATH, workerId, taskId));
}
boolean taskAnnounced(final String workerId, final String taskId)
boolean taskAssigned(final String workerId, final String taskId)
{
return pathExists(JOINER.join(TASKS_PATH, workerId, taskId));
}