diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 21264a9e396..e4b4b2f9c94 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1445,6 +1445,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -1475,6 +1476,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); } EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.READING)) .anyTimes(); @@ -2561,6 +2563,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -2603,6 +2606,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); } EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.READING)) .anyTimes(); @@ -2647,6 +2651,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( @@ -2689,6 +2694,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); } EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(Status.READING)) .anyTimes(); @@ -2809,6 +2815,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) .andReturn(ImmutableList.of(id1, id2, id3)) .anyTimes(); @@ -2850,6 +2859,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.reset(taskRunner, taskClient, taskQueue); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskClient.pauseAsync("id2")) .andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0, 15L, 1, 25L, 2, 30L))); EasyMock.expect(taskClient.setEndOffsetsAsync("id2", singlePartitionMap(topic, 0, 15L, 1, 25L, 2, 30L), true)) @@ -2871,6 +2883,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2896,6 +2909,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -2952,6 +2966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -3531,6 +3546,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) .andReturn(ImmutableList.of(id1, id2, id3)) .anyTimes(); @@ -3589,6 +3607,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 87297dd8f28..a2f3e98deca 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -1496,6 +1496,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata(null) @@ -1524,6 +1525,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); } EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)) .anyTimes(); @@ -1654,6 +1656,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); @@ -2200,6 +2203,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( @@ -2245,6 +2249,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes(); } EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes(); EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)) .anyTimes(); @@ -2308,6 +2313,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KinesisDataSourceMetadata( @@ -2330,6 +2336,8 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue); + EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes(); + TreeMap> checkpoints1 = new TreeMap<>(); checkpoints1.put(0, ImmutableMap.of( SHARD_ID1, @@ -2529,6 +2537,8 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); @@ -2580,6 +2590,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.reset(taskRunner, taskClient, taskQueue); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskClient.pauseAsync("id2")) .andReturn(Futures.immediateFuture(ImmutableMap.of( SHARD_ID1, @@ -3583,6 +3596,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java index 2940fc4f9c3..1894336043d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java @@ -19,7 +19,7 @@ package org.apache.druid.msq.indexing.client; -import org.apache.druid.client.indexing.TaskStatusResponse; +import com.google.common.collect.ImmutableSet; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -65,10 +65,13 @@ public class IndexerWorkerManagerClient implements WorkerManagerClient @Override public TaskLocation location(String workerId) { - final TaskStatusResponse response = FutureUtils.getUnchecked(overlordClient.taskStatus(workerId), true); + final TaskStatus response = FutureUtils.getUnchecked( + overlordClient.taskStatuses(ImmutableSet.of(workerId)), + true + ).get(workerId); - if (response.getStatus() != null) { - return response.getStatus().getLocation(); + if (response != null) { + return response.getLocation(); } else { return TaskLocation.unknown(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 3a06af69d1e..830ae9b732b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.annotations.SuppressFBWarnings; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; +import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; @@ -671,6 +672,8 @@ public class TaskQueue // Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor // remembers that this task has completed. try { + //The code block is only called when a task completes, + //and we need to check to make sure the metadata store has the correct status stored. final Optional previousStatus = taskStorage.getStatus(task.getId()); if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); @@ -945,6 +948,16 @@ public class TaskQueue } } + public Optional getTaskStatus(final String taskId) + { + RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId); + if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) { + return Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId))); + } else { + return taskStorage.getStatus(taskId); + } + } + public CoordinatorRunStats getQueueStats() { final int queuedUpdates = statusUpdatesInQueue.get(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index b529aa45854..8416b2e0968 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -469,9 +469,15 @@ public class OverlordResource return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build(); } + final Optional taskQueue = taskMaster.getTaskQueue(); Map result = Maps.newHashMapWithExpectedSize(taskIds.size()); for (String taskId : taskIds) { - Optional optional = taskStorageQueryAdapter.getStatus(taskId); + final Optional optional; + if (taskQueue.isPresent()) { + optional = taskQueue.get().getTaskStatus(taskId); + } else { + optional = taskStorageQueryAdapter.getStatus(taskId); + } if (optional.isPresent()) { result.put(taskId, optional.get()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 5e937fe69eb..62d3caa83d6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -27,11 +27,9 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -876,14 +874,7 @@ public abstract class SeekableStreamSupervisor taskRunner = taskMaster.getTaskRunner(); if (taskRunner.isPresent()) { - Optional item = Iterables.tryFind( - taskRunner.get().getRunningTasks(), - (Predicate) taskRunnerWorkItem -> id.equals(taskRunnerWorkItem.getTaskId()) - ); - - if (item.isPresent()) { - return item.get().getLocation(); - } + return taskRunner.get().getTaskLocation(id); } else { log.error("Failed to get task runner because I'm not the leader!"); } @@ -894,7 +885,12 @@ public abstract class SeekableStreamSupervisor getTaskStatus(String id) { - return taskStorage.getStatus(id); + final Optional taskQueue = taskMaster.getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getTaskStatus(id); + } else { + return taskStorage.getStatus(id); + } } }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index a1a93e29cbf..8ca1ff49f05 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -460,6 +460,66 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES)); } + @Test + public void testGetTaskStatus() + { + final String newTask = "newTask"; + final String waitingTask = "waitingTask"; + final String pendingTask = "pendingTask"; + final String runningTask = "runningTask"; + final String successfulTask = "successfulTask"; + final String failedTask = "failedTask"; + + TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class); + EasyMock.expect(taskStorage.getStatus(newTask)) + .andReturn(Optional.of(TaskStatus.running(newTask))); + EasyMock.expect(taskStorage.getStatus(successfulTask)) + .andReturn(Optional.of(TaskStatus.success(successfulTask))); + EasyMock.expect(taskStorage.getStatus(failedTask)) + .andReturn(Optional.of(TaskStatus.failure(failedTask, failedTask))); + EasyMock.replay(taskStorage); + + TaskRunner taskRunner = EasyMock.createMock(HttpRemoteTaskRunner.class); + EasyMock.expect(taskRunner.getRunnerTaskState(newTask)) + .andReturn(null); + EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask)) + .andReturn(RunnerTaskState.WAITING); + EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask)) + .andReturn(RunnerTaskState.PENDING); + EasyMock.expect(taskRunner.getRunnerTaskState(runningTask)) + .andReturn(RunnerTaskState.RUNNING); + EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask)) + .andReturn(RunnerTaskState.NONE); + EasyMock.expect(taskRunner.getRunnerTaskState(failedTask)) + .andReturn(RunnerTaskState.NONE); + EasyMock.expect(taskRunner.getTaskLocation(waitingTask)) + .andReturn(TaskLocation.unknown()); + EasyMock.expect(taskRunner.getTaskLocation(pendingTask)) + .andReturn(TaskLocation.unknown()); + EasyMock.expect(taskRunner.getTaskLocation(runningTask)) + .andReturn(TaskLocation.create("host", 8100, 8100)); + EasyMock.replay(taskRunner); + + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null, null), + new DefaultTaskConfig(), + taskStorage, + taskRunner, + createActionClientFactory(), + getLockbox(), + new StubServiceEmitter("druid/overlord", "testHost") + ); + taskQueue.setActive(true); + + Assert.assertEquals(TaskStatus.running(newTask), taskQueue.getTaskStatus(newTask).get()); + Assert.assertEquals(TaskStatus.running(waitingTask), taskQueue.getTaskStatus(waitingTask).get()); + Assert.assertEquals(TaskStatus.running(pendingTask), taskQueue.getTaskStatus(pendingTask).get()); + Assert.assertEquals(TaskStatus.running(runningTask), taskQueue.getTaskStatus(runningTask).get()); + Assert.assertEquals(TaskStatus.success(successfulTask), taskQueue.getTaskStatus(successfulTask).get()); + Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), taskQueue.getTaskStatus(failedTask).get()); + } + private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List runningTasks) { HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 8c1b6765431..19eb1f8e7a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -1794,6 +1794,82 @@ public class OverlordResourceTest Assert.assertEquals(expectedResourceActions, resourceActions); } + @Test + public void testGetMultipleTaskStatuses_presentTaskQueue() + { + // Needed for teardown + EasyMock.replay( + authConfig, + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); + EasyMock.expect(taskQueue.getTaskStatus("task")) + .andReturn(Optional.of(TaskStatus.running("task"))); + EasyMock.replay(taskQueue); + TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)); + EasyMock.replay(taskMaster); + OverlordResource overlordResource = new OverlordResource( + taskMaster, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) + .getEntity(); + Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); + } + + @Test + public void testGetMultipleTaskStatuses_absentTaskQueue() + { + // Needed for teardown + EasyMock.replay( + authConfig, + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); + EasyMock.expect(taskStorageQueryAdapter.getStatus("task")) + .andReturn(Optional.of(TaskStatus.running("task"))); + EasyMock.replay(taskStorageQueryAdapter); + TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); + EasyMock.replay(taskMaster); + OverlordResource overlordResource = new OverlordResource( + taskMaster, + taskStorageQueryAdapter, + null, + null, + null, + null, + null, + null, + null, + null + ); + final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) + .getEntity(); + Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); + } + private void expectAuthorizationTokenCheck() { expectAuthorizationTokenCheck(Users.DRUID); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 840b4e9f69a..ea083423eaa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1101,6 +1101,9 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport workItems.add(new TestTaskRunnerWorkItem(id3, null, location3)); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(location3).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) .andReturn(ImmutableList.of(id1, id2, id3)) .anyTimes(); diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java index 0276e768ba9..163c7e14e01 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java @@ -19,16 +19,16 @@ package org.apache.druid.rpc.indexing; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.errorprone.annotations.concurrent.GuardedBy; -import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; -import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.rpc.ServiceLocation; @@ -36,9 +36,11 @@ import org.apache.druid.rpc.ServiceLocations; import org.apache.druid.rpc.ServiceLocator; import java.util.Collections; +import java.util.Map; +import java.util.Set; /** - * Service locator for a specific task. Uses the {@link OverlordClient#taskStatus} API to locate tasks. + * Service locator for a specific task. Uses the {@link OverlordClient#taskStatuses(Set)} API to locate tasks. * * This locator has an internal cache that is updated if the last check has been over {@link #LOCATION_CACHE_MS} ago. * @@ -85,10 +87,10 @@ public class SpecificTaskServiceLocator implements ServiceLocator } else if (closed || lastKnownState != TaskState.RUNNING) { return Futures.immediateFuture(ServiceLocations.closed()); } else if (lastKnownLocation == null || lastUpdateTime + LOCATION_CACHE_MS < System.currentTimeMillis()) { - final ListenableFuture taskStatusFuture; + final ListenableFuture> taskStatusFuture; try { - taskStatusFuture = overlordClient.taskStatus(taskId); + taskStatusFuture = overlordClient.taskStatuses(ImmutableSet.of(taskId)); } catch (Exception e) { throw new RuntimeException(e); @@ -110,31 +112,31 @@ public class SpecificTaskServiceLocator implements ServiceLocator Futures.addCallback( taskStatusFuture, - new FutureCallback() + new FutureCallback>() { @Override - public void onSuccess(final TaskStatusResponse taskStatus) + public void onSuccess(final Map taskStatusMap) { synchronized (lock) { if (pendingFuture != null) { lastUpdateTime = System.currentTimeMillis(); - final TaskStatusPlus statusPlus = taskStatus.getStatus(); + final TaskStatus status = taskStatusMap.get(taskId); - if (statusPlus == null) { + if (status == null) { // If the task status is unknown, we'll treat it as closed. lastKnownState = null; lastKnownLocation = null; } else { - lastKnownState = statusPlus.getStatusCode(); + lastKnownState = status.getStatusCode(); - if (TaskLocation.unknown().equals(statusPlus.getLocation())) { + if (TaskLocation.unknown().equals(status.getLocation())) { lastKnownLocation = null; } else { lastKnownLocation = new ServiceLocation( - statusPlus.getLocation().getHost(), - statusPlus.getLocation().getPort(), - statusPlus.getLocation().getTlsPort(), + status.getLocation().getHost(), + status.getLocation().getPort(), + status.getLocation().getTlsPort(), StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId)) ); } diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java index 85ae025c734..4888078af5d 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java @@ -22,11 +22,9 @@ package org.apache.druid.rpc.indexing; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; -import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.java.util.common.ISE; import org.apache.druid.rpc.ServiceLocation; import org.apache.druid.rpc.ServiceLocations; @@ -43,6 +41,7 @@ import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; import java.util.Collections; +import java.util.Map; import java.util.concurrent.ExecutionException; public class SpecificTaskServiceLocatorTest @@ -61,8 +60,8 @@ public class SpecificTaskServiceLocatorTest @Test public void test_locate_noLocationYet() throws Exception { - Mockito.when(overlordClient.taskStatus(TASK_ID)) - .thenReturn(makeResponse(TaskState.RUNNING, TaskLocation.unknown())); + Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID))) + .thenReturn(status(TaskState.RUNNING, TaskLocation.unknown())); final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient); final ListenableFuture future = locator.locate(); @@ -72,8 +71,8 @@ public class SpecificTaskServiceLocatorTest @Test public void test_locate_taskRunning() throws Exception { - Mockito.when(overlordClient.taskStatus(TASK_ID)) - .thenReturn(makeResponse(TaskState.RUNNING, TASK_LOCATION1)); + Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID))) + .thenReturn(status(TaskState.RUNNING, TASK_LOCATION1)); final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient); Assert.assertEquals(ServiceLocations.forLocation(SERVICE_LOCATION1), locator.locate().get()); @@ -82,8 +81,8 @@ public class SpecificTaskServiceLocatorTest @Test public void test_locate_taskNotFound() throws Exception { - Mockito.when(overlordClient.taskStatus(TASK_ID)) - .thenReturn(Futures.immediateFuture(new TaskStatusResponse(TASK_ID, null))); + Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID))) + .thenReturn(Futures.immediateFuture(Collections.singletonMap(TASK_ID, null))); final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient); final ListenableFuture future = locator.locate(); @@ -93,8 +92,8 @@ public class SpecificTaskServiceLocatorTest @Test public void test_locate_taskSuccess() throws Exception { - Mockito.when(overlordClient.taskStatus(TASK_ID)) - .thenReturn(makeResponse(TaskState.SUCCESS, TaskLocation.unknown())); + Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID))) + .thenReturn(status(TaskState.SUCCESS, TaskLocation.unknown())); final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient); final ListenableFuture future = locator.locate(); @@ -104,8 +103,8 @@ public class SpecificTaskServiceLocatorTest @Test public void test_locate_taskFailed() throws Exception { - Mockito.when(overlordClient.taskStatus(TASK_ID)) - .thenReturn(makeResponse(TaskState.FAILED, TaskLocation.unknown())); + Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID))) + .thenReturn(status(TaskState.FAILED, TaskLocation.unknown())); final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient); final ListenableFuture future = locator.locate(); @@ -115,7 +114,7 @@ public class SpecificTaskServiceLocatorTest @Test public void test_locate_overlordError() { - Mockito.when(overlordClient.taskStatus(TASK_ID)) + Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID))) .thenReturn(Futures.immediateFailedFuture(new ISE("oh no"))); final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient); @@ -134,8 +133,8 @@ public class SpecificTaskServiceLocatorTest public void test_locate_afterClose() throws Exception { // Overlord call will never return. - final SettableFuture overlordFuture = SettableFuture.create(); - Mockito.when(overlordClient.taskStatus(TASK_ID)) + final SettableFuture> overlordFuture = SettableFuture.create(); + Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID))) .thenReturn(overlordFuture); final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient); @@ -147,26 +146,15 @@ public class SpecificTaskServiceLocatorTest Assert.assertTrue(overlordFuture.isCancelled()); } - private static ListenableFuture makeResponse(final TaskState state, final TaskLocation location) + private static ListenableFuture> status(final TaskState state, final TaskLocation location) { - final TaskStatusResponse response = new TaskStatusResponse( + final TaskStatus status = new TaskStatus( TASK_ID, - new TaskStatusPlus( - TASK_ID, - null, - null, - DateTimes.utc(0), - DateTimes.utc(0), - state, - null, - null, - 1L, - location, - null, - null - ) + state, + 1L, + null, + location ); - - return Futures.immediateFuture(response); + return Futures.immediateFuture(Collections.singletonMap(TASK_ID, status)); } }