From 77828bead48edd02fa73d1604c7476c6b7c14803 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Fri, 17 Nov 2023 12:19:20 +0530 Subject: [PATCH] Fetch active task payloads from memory (#15377) The TaskQueue maintains a map of active task ids to tasks, which can be utilized to get active task payloads, before falling back to the metadata store. --- .../org/apache/druid/indexing/overlord/TaskQueue.java | 11 +++++++++++ .../indexing/overlord/TaskStorageQueryAdapter.java | 10 +++++++++- .../druid/indexing/overlord/TaskLifecycleTest.java | 5 ++++- .../druid/indexing/overlord/http/OverlordTest.java | 2 +- 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index ae0708344db..69762ba7190 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -947,6 +947,17 @@ public class TaskQueue return stats; } + public Optional getActiveTask(String id) + { + giant.lock(); + try { + return Optional.fromNullable(tasks.get(id)); + } + finally { + giant.unlock(); + } + } + @VisibleForTesting List getTasks() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 140d9b7ac40..ba2ca3c7066 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -48,12 +48,14 @@ public class TaskStorageQueryAdapter { private final TaskStorage storage; private final TaskLockbox taskLockbox; + private final Optional taskQueue; @Inject - public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox) + public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) { this.storage = storage; this.taskLockbox = taskLockbox; + this.taskQueue = taskMaster.getTaskQueue(); } public List getActiveTasks() @@ -104,6 +106,12 @@ public class TaskStorageQueryAdapter public Optional getTask(final String taskid) { + if (taskQueue.isPresent()) { + Optional activeTask = taskQueue.get().getActiveTask(taskid); + if (activeTask.isPresent()) { + return activeTask; + } + } return storage.getTask(taskid); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 69f0039f615..572364a56a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -482,7 +482,10 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest default: throw new RE("Unknown task storage type [%s]", taskStorageType); } - tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox); + TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); return taskStorage; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 12be2ceafa6..f9ce36df181 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -257,7 +257,7 @@ public class OverlordTest Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); - final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox); + final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); // Test Overlord resource stuff overlordResource = new OverlordResource(