From f594e7ac243f21acd0b9cb5dd795b6088cad6b27 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Wed, 2 Mar 2022 09:38:32 -0800 Subject: [PATCH] perf: improve RemoteTaskRunner task assignment loop performance (#12096) * perf: improve ZkWorker task lookup performance This improves the performance of the ZkWorker task lookup loop by eliminating repeat calls to getRunningTasks() in toImmutable(), and reduces the work performed in isRunningTask() to stream-parse the id field instead of entire JSON blob. --- .../druid/indexing/overlord/ZkWorker.java | 71 +++++++-- .../indexing/worker/TaskAnnouncement.java | 4 +- .../druid/indexing/overlord/ZkWorkerTest.java | 150 ++++++++++++++++++ 3 files changed, 215 insertions(+), 10 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/overlord/ZkWorkerTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java index a875090046e..67d30ebc8d4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java @@ -20,6 +20,8 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -44,6 +46,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * Holds information about a worker and a listener for task status changes associated with the worker. @@ -52,6 +55,7 @@ public class ZkWorker implements Closeable { private final PathChildrenCache statusCache; private final Function cacheConverter; + private final java.util.function.Function taskIdExtractor; private AtomicReference worker; private AtomicReference lastCompletedTaskTime = new AtomicReference<>(DateTimes.nowUtc()); @@ -64,6 +68,33 @@ public class ZkWorker implements Closeable this.statusCache = statusCache; this.cacheConverter = (ChildData input) -> JacksonUtils.readValue(jsonMapper, input.getData(), TaskAnnouncement.class); + this.taskIdExtractor = createTaskIdExtractor(jsonMapper); + } + + static java.util.function.Function createTaskIdExtractor(final ObjectMapper jsonMapper) + { + return (ChildData input) -> { + try (JsonParser parser = jsonMapper.getFactory().createParser(input.getData())) { + while (parser.nextToken() != JsonToken.END_OBJECT) { + String currentName = parser.getCurrentName(); + if (currentName == null) { + continue; + } + + switch (currentName) { + case TaskAnnouncement.TASK_ID_KEY: + parser.nextToken(); + return parser.getValueAsString(); + default: + parser.skipChildren(); + } + } + return null; + } + catch (IOException e) { + throw new RuntimeException(e); + } + }; } public void start() throws Exception @@ -85,7 +116,10 @@ public class ZkWorker implements Closeable @JsonProperty("runningTasks") public Collection getRunningTaskIds() { - return getRunningTasks().keySet(); + return statusCache.getCurrentData() + .stream() + .map(taskIdExtractor) + .collect(Collectors.toSet()); } public Map getRunningTasks() @@ -103,9 +137,14 @@ public class ZkWorker implements Closeable @JsonProperty("currCapacityUsed") public int getCurrCapacityUsed() + { + return getCurrCapacityUsed(getRunningTasks()); + } + + private static int getCurrCapacityUsed(Map tasks) { int currCapacity = 0; - for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { + for (TaskAnnouncement taskAnnouncement : tasks.values()) { currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity(); } return currCapacity; @@ -113,9 +152,14 @@ public class ZkWorker implements Closeable @JsonProperty("currParallelIndexCapacityUsed") public int getCurrParallelIndexCapacityUsed() + { + return getCurrParallelIndexCapacityUsed(getRunningTasks()); + } + + private int getCurrParallelIndexCapacityUsed(Map tasks) { int currParallelIndexCapacityUsed = 0; - for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { + for (TaskAnnouncement taskAnnouncement : tasks.values()) { if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) { currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity(); } @@ -125,9 +169,14 @@ public class ZkWorker implements Closeable @JsonProperty("availabilityGroups") public Set getAvailabilityGroups() + { + return getAvailabilityGroups(getRunningTasks()); + } + + private static Set getAvailabilityGroups(Map tasks) { Set retVal = new HashSet<>(); - for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { + for (TaskAnnouncement taskAnnouncement : tasks.values()) { retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup()); } return retVal; @@ -147,7 +196,10 @@ public class ZkWorker implements Closeable public boolean isRunningTask(String taskId) { - return getRunningTasks().containsKey(taskId); + return statusCache.getCurrentData() + .stream() + .map(taskIdExtractor) + .anyMatch((String s) -> taskId.equals(s)); } @UsedInGeneratedCode // See JavaScriptWorkerSelectStrategyTest @@ -177,13 +229,14 @@ public class ZkWorker implements Closeable public ImmutableWorkerInfo toImmutable() { + Map tasks = getRunningTasks(); return new ImmutableWorkerInfo( worker.get(), - getCurrCapacityUsed(), - getCurrParallelIndexCapacityUsed(), - getAvailabilityGroups(), - getRunningTaskIds(), + getCurrCapacityUsed(tasks), + getCurrParallelIndexCapacityUsed(tasks), + getAvailabilityGroups(tasks), + tasks.keySet(), lastCompletedTaskTime.get(), blacklistedUntil.get() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/TaskAnnouncement.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/TaskAnnouncement.java index f99fce8b587..7d6842b4ca7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/TaskAnnouncement.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/TaskAnnouncement.java @@ -35,6 +35,8 @@ import javax.annotation.Nullable; */ public class TaskAnnouncement { + public static final String TASK_ID_KEY = "id"; + private final String taskType; private final TaskStatus taskStatus; private final TaskResource taskResource; @@ -63,7 +65,7 @@ public class TaskAnnouncement @JsonCreator private TaskAnnouncement( - @JsonProperty("id") String taskId, + @JsonProperty(TASK_ID_KEY) String taskId, @JsonProperty("type") String taskType, @JsonProperty("status") TaskState status, @JsonProperty("taskStatus") TaskStatus taskStatus, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ZkWorkerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ZkWorkerTest.java new file mode 100644 index 00000000000..a051737a0ed --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ZkWorkerTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.worker.TaskAnnouncement; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.zookeeper.data.Stat; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.function.Function; + +public class ZkWorkerTest +{ + Function extract; + + @Before + public void setup() + { + ObjectMapper mapper = new DefaultObjectMapper(); + extract = ZkWorker.createTaskIdExtractor(mapper); + } + + ChildData prepare(String input) + { + String replaced = StringUtils.format(StringUtils.replaceChar(input, '\'', "\""), TaskAnnouncement.TASK_ID_KEY); + byte[] data = StringUtils.toUtf8(replaced); + return new ChildData("/a/b/c", new Stat(), data); + } + + @Test + public void testShallowObjectWithIdFirst() + { + ChildData input = prepare("{'%s': 'abcd', 'status': 'RUNNING'}"); + String actual = extract.apply(input); + Assert.assertEquals("abcd", actual); + } + + @Test + public void testShallowObjectWithIdMiddle() + { + ChildData input = prepare("{'before': 'something', '%s': 'abcd', 'status': 'RUNNING'}"); + String actual = extract.apply(input); + Assert.assertEquals("abcd", actual); + } + + @Test + public void testShallowObjectWithIdLast() + { + ChildData input = prepare("{'before': 'something', 'status': 'RUNNING', '%s': 'abcd'}"); + String actual = extract.apply(input); + Assert.assertEquals("abcd", actual); + } + + @Test + public void testShallowObjectWithNoId() + { + ChildData input = prepare("{'before': 'something', 'status': 'RUNNING'}"); + String actual = extract.apply(input); + Assert.assertNull(actual); + } + + @Test + public void testDeepObjectWithIdFirst() + { + ChildData input = prepare("{'%s': 'abcd', 'subobject': { 'subkey': 'subvalue' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING'}"); + String actual = extract.apply(input); + Assert.assertEquals("abcd", actual); + } + + @Test + public void testDeepObjectWithIdLast() + { + ChildData input = prepare("{'subobject': { 'subkey': 'subvalue' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING', '%s': 'abcd'}"); + String actual = extract.apply(input); + Assert.assertEquals("abcd", actual); + } + + @Test + public void testDeepObjectWithIdInNestedOnly() + { + ChildData input = prepare("{'subobject': { '%s': 'defg' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING'}"); + String actual = extract.apply(input); + Assert.assertNull(actual); + } + + @Test + public void testDeepObjectWithIdInNestedAndOuter() + { + ChildData input = prepare("{'subobject': { '%s': 'defg' }, 'subarray': [{'key': 'val'}, 2, 3], 'status': 'RUNNING', '%1$s': 'abcd'}"); + String actual = extract.apply(input); + Assert.assertEquals("abcd", actual); + } + + @Test + public void testIdWithWrongTypeReturnsNull() + { + ChildData input = prepare("{'%s': {'nested': 'obj'}'"); + String actual = extract.apply(input); + Assert.assertNull(actual); + } + + @Test + public void testCanReadIdFromAJacksonSerializedTaskAnnouncement() throws JsonProcessingException + { + final String expectedTaskId = "task01234"; + + Task task0 = NoopTask.create(expectedTaskId, 0); + TaskAnnouncement taskAnnouncement = TaskAnnouncement.create( + task0, + TaskStatus.running(task0.getId()), + TaskLocation.unknown() + ); + + ObjectMapper objectMapper = new ObjectMapper(); + + byte[] serialized = objectMapper.writeValueAsBytes(taskAnnouncement); + + ChildData zkNode = new ChildData("/a/b/c", new Stat(), serialized); + + String actualExtractedTaskId = extract.apply(zkNode); + Assert.assertEquals(expectedTaskId, actualExtractedTaskId); + } +}