perf: improve RemoteTaskRunner task assignment loop performance (#12096)

* perf: improve ZkWorker task lookup performance

This improves the performance of the ZkWorker task lookup loop by
eliminating repeat calls to getRunningTasks() in toImmutable(),
and reduces the work performed in isRunningTask() to stream-parse
the id field instead of entire JSON blob.
This commit is contained in:
Jason Koch 2022-03-02 09:38:32 -08:00 committed by GitHub
parent 1af4c9c933
commit f594e7ac24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 215 additions and 10 deletions

View File

@ -20,6 +20,8 @@
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@ -44,6 +46,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* Holds information about a worker and a listener for task status changes associated with the worker.
@ -52,6 +55,7 @@ public class ZkWorker implements Closeable
{
private final PathChildrenCache statusCache;
private final Function<ChildData, TaskAnnouncement> cacheConverter;
private final java.util.function.Function<ChildData, String> taskIdExtractor;
private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<>(DateTimes.nowUtc());
@ -64,6 +68,33 @@ public class ZkWorker implements Closeable
this.statusCache = statusCache;
this.cacheConverter = (ChildData input) ->
JacksonUtils.readValue(jsonMapper, input.getData(), TaskAnnouncement.class);
this.taskIdExtractor = createTaskIdExtractor(jsonMapper);
}
static java.util.function.Function<ChildData, String> createTaskIdExtractor(final ObjectMapper jsonMapper)
{
return (ChildData input) -> {
try (JsonParser parser = jsonMapper.getFactory().createParser(input.getData())) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
String currentName = parser.getCurrentName();
if (currentName == null) {
continue;
}
switch (currentName) {
case TaskAnnouncement.TASK_ID_KEY:
parser.nextToken();
return parser.getValueAsString();
default:
parser.skipChildren();
}
}
return null;
}
catch (IOException e) {
throw new RuntimeException(e);
}
};
}
public void start() throws Exception
@ -85,7 +116,10 @@ public class ZkWorker implements Closeable
@JsonProperty("runningTasks")
public Collection<String> getRunningTaskIds()
{
return getRunningTasks().keySet();
return statusCache.getCurrentData()
.stream()
.map(taskIdExtractor)
.collect(Collectors.toSet());
}
public Map<String, TaskAnnouncement> getRunningTasks()
@ -103,9 +137,14 @@ public class ZkWorker implements Closeable
@JsonProperty("currCapacityUsed")
public int getCurrCapacityUsed()
{
return getCurrCapacityUsed(getRunningTasks());
}
private static int getCurrCapacityUsed(Map<String, TaskAnnouncement> tasks)
{
int currCapacity = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
for (TaskAnnouncement taskAnnouncement : tasks.values()) {
currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
return currCapacity;
@ -113,9 +152,14 @@ public class ZkWorker implements Closeable
@JsonProperty("currParallelIndexCapacityUsed")
public int getCurrParallelIndexCapacityUsed()
{
return getCurrParallelIndexCapacityUsed(getRunningTasks());
}
private int getCurrParallelIndexCapacityUsed(Map<String, TaskAnnouncement> tasks)
{
int currParallelIndexCapacityUsed = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
for (TaskAnnouncement taskAnnouncement : tasks.values()) {
if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
@ -125,9 +169,14 @@ public class ZkWorker implements Closeable
@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
return getAvailabilityGroups(getRunningTasks());
}
private static Set<String> getAvailabilityGroups(Map<String, TaskAnnouncement> tasks)
{
Set<String> retVal = new HashSet<>();
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
for (TaskAnnouncement taskAnnouncement : tasks.values()) {
retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
}
return retVal;
@ -147,7 +196,10 @@ public class ZkWorker implements Closeable
public boolean isRunningTask(String taskId)
{
return getRunningTasks().containsKey(taskId);
return statusCache.getCurrentData()
.stream()
.map(taskIdExtractor)
.anyMatch((String s) -> taskId.equals(s));
}
@UsedInGeneratedCode // See JavaScriptWorkerSelectStrategyTest
@ -177,13 +229,14 @@ public class ZkWorker implements Closeable
public ImmutableWorkerInfo toImmutable()
{
Map<String, TaskAnnouncement> tasks = getRunningTasks();
return new ImmutableWorkerInfo(
worker.get(),
getCurrCapacityUsed(),
getCurrParallelIndexCapacityUsed(),
getAvailabilityGroups(),
getRunningTaskIds(),
getCurrCapacityUsed(tasks),
getCurrParallelIndexCapacityUsed(tasks),
getAvailabilityGroups(tasks),
tasks.keySet(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);

View File

@ -35,6 +35,8 @@ import javax.annotation.Nullable;
*/
public class TaskAnnouncement
{
public static final String TASK_ID_KEY = "id";
private final String taskType;
private final TaskStatus taskStatus;
private final TaskResource taskResource;
@ -63,7 +65,7 @@ public class TaskAnnouncement
@JsonCreator
private TaskAnnouncement(
@JsonProperty("id") String taskId,
@JsonProperty(TASK_ID_KEY) String taskId,
@JsonProperty("type") String taskType,
@JsonProperty("status") TaskState status,
@JsonProperty("taskStatus") TaskStatus taskStatus,

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.function.Function;
public class ZkWorkerTest
{
Function<ChildData, String> extract;
@Before
public void setup()
{
ObjectMapper mapper = new DefaultObjectMapper();
extract = ZkWorker.createTaskIdExtractor(mapper);
}
ChildData prepare(String input)
{
String replaced = StringUtils.format(StringUtils.replaceChar(input, '\'', "\""), TaskAnnouncement.TASK_ID_KEY);
byte[] data = StringUtils.toUtf8(replaced);
return new ChildData("/a/b/c", new Stat(), data);
}
@Test
public void testShallowObjectWithIdFirst()
{
ChildData input = prepare("{'%s': 'abcd', 'status': 'RUNNING'}");
String actual = extract.apply(input);
Assert.assertEquals("abcd", actual);
}
@Test
public void testShallowObjectWithIdMiddle()
{
ChildData input = prepare("{'before': 'something', '%s': 'abcd', 'status': 'RUNNING'}");
String actual = extract.apply(input);
Assert.assertEquals("abcd", actual);
}
@Test
public void testShallowObjectWithIdLast()
{
ChildData input = prepare("{'before': 'something', 'status': 'RUNNING', '%s': 'abcd'}");
String actual = extract.apply(input);
Assert.assertEquals("abcd", actual);
}
@Test
public void testShallowObjectWithNoId()
{
ChildData input = prepare("{'before': 'something', 'status': 'RUNNING'}");
String actual = extract.apply(input);
Assert.assertNull(actual);
}
@Test
public void testDeepObjectWithIdFirst()
{
ChildData input = prepare("{'%s': 'abcd', 'subobject': { 'subkey': 'subvalue' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING'}");
String actual = extract.apply(input);
Assert.assertEquals("abcd", actual);
}
@Test
public void testDeepObjectWithIdLast()
{
ChildData input = prepare("{'subobject': { 'subkey': 'subvalue' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING', '%s': 'abcd'}");
String actual = extract.apply(input);
Assert.assertEquals("abcd", actual);
}
@Test
public void testDeepObjectWithIdInNestedOnly()
{
ChildData input = prepare("{'subobject': { '%s': 'defg' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING'}");
String actual = extract.apply(input);
Assert.assertNull(actual);
}
@Test
public void testDeepObjectWithIdInNestedAndOuter()
{
ChildData input = prepare("{'subobject': { '%s': 'defg' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING', '%1$s': 'abcd'}");
String actual = extract.apply(input);
Assert.assertEquals("abcd", actual);
}
@Test
public void testIdWithWrongTypeReturnsNull()
{
ChildData input = prepare("{'%s': {'nested': 'obj'}'");
String actual = extract.apply(input);
Assert.assertNull(actual);
}
@Test
public void testCanReadIdFromAJacksonSerializedTaskAnnouncement() throws JsonProcessingException
{
final String expectedTaskId = "task01234";
Task task0 = NoopTask.create(expectedTaskId, 0);
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(
task0,
TaskStatus.running(task0.getId()),
TaskLocation.unknown()
);
ObjectMapper objectMapper = new ObjectMapper();
byte[] serialized = objectMapper.writeValueAsBytes(taskAnnouncement);
ChildData zkNode = new ChildData("/a/b/c", new Stat(), serialized);
String actualExtractedTaskId = extract.apply(zkNode);
Assert.assertEquals(expectedTaskId, actualExtractedTaskId);
}
}