diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index ae0708344db..69762ba7190 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -947,6 +947,17 @@ public class TaskQueue return stats; } + public Optional getActiveTask(String id) + { + giant.lock(); + try { + return Optional.fromNullable(tasks.get(id)); + } + finally { + giant.unlock(); + } + } + @VisibleForTesting List getTasks() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 140d9b7ac40..ba2ca3c7066 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -48,12 +48,14 @@ public class TaskStorageQueryAdapter { private final TaskStorage storage; private final TaskLockbox taskLockbox; + private final Optional taskQueue; @Inject - public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox) + public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) { this.storage = storage; this.taskLockbox = taskLockbox; + this.taskQueue = taskMaster.getTaskQueue(); } public List getActiveTasks() @@ -104,6 +106,12 @@ public class TaskStorageQueryAdapter public Optional getTask(final String taskid) { + if (taskQueue.isPresent()) { + Optional activeTask = taskQueue.get().getActiveTask(taskid); + if (activeTask.isPresent()) { + return activeTask; + } + } return storage.getTask(taskid); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 69f0039f615..572364a56a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -482,7 +482,10 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest default: throw new RE("Unknown task storage type [%s]", taskStorageType); } - tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox); + TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); return taskStorage; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 12be2ceafa6..f9ce36df181 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -257,7 +257,7 @@ public class OverlordTest Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); - final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox); + final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); // Test Overlord resource stuff overlordResource = new OverlordResource(