Try to fetch the task status for an active from memory (#15724)

* Reduce metadata calls to fetch the status for an active task
This commit is contained in:
AmatyaAvadhanula 2024-02-26 13:53:05 +05:30 committed by GitHub
parent ebb7190545
commit e2b7289dea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 246 additions and 64 deletions

View File

@ -1445,6 +1445,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -1475,6 +1476,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
@ -2561,6 +2563,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -2603,6 +2606,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
@ -2647,6 +2651,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -2689,6 +2694,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
@ -2809,6 +2815,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
@ -2850,6 +2859,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.reset(taskRunner, taskClient, taskQueue);
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0, 15L, 1, 25L, 2, 30L)));
EasyMock.expect(taskClient.setEndOffsetsAsync("id2", singlePartitionMap(topic, 0, 15L, 1, 25L, 2, 30L), true))
@ -2871,6 +2883,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
@ -2896,6 +2909,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
@ -2952,6 +2966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
@ -3531,6 +3546,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
@ -3589,6 +3607,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();

View File

@ -1496,6 +1496,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(null)
@ -1524,6 +1525,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
@ -1654,6 +1656,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
@ -2200,6 +2203,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
@ -2245,6 +2249,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
@ -2308,6 +2313,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
@ -2330,6 +2336,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue);
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(
SHARD_ID1,
@ -2529,6 +2537,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
@ -2580,6 +2590,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.reset(taskRunner, taskClient, taskQueue);
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(ImmutableMap.of(
SHARD_ID1,
@ -3583,6 +3596,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();

View File

@ -19,7 +19,7 @@
package org.apache.druid.msq.indexing.client;
import org.apache.druid.client.indexing.TaskStatusResponse;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@ -65,10 +65,13 @@ public class IndexerWorkerManagerClient implements WorkerManagerClient
@Override
public TaskLocation location(String workerId)
{
final TaskStatusResponse response = FutureUtils.getUnchecked(overlordClient.taskStatus(workerId), true);
final TaskStatus response = FutureUtils.getUnchecked(
overlordClient.taskStatuses(ImmutableSet.of(workerId)),
true
).get(workerId);
if (response.getStatus() != null) {
return response.getStatus().getLocation();
if (response != null) {
return response.getLocation();
} else {
return TaskLocation.unknown();
}

View File

@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
@ -671,6 +672,8 @@ public class TaskQueue
// Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor
// remembers that this task has completed.
try {
//The code block is only called when a task completes,
//and we need to check to make sure the metadata store has the correct status stored.
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
@ -945,6 +948,16 @@ public class TaskQueue
}
}
public Optional<TaskStatus> getTaskStatus(final String taskId)
{
RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
return Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
} else {
return taskStorage.getStatus(taskId);
}
}
public CoordinatorRunStats getQueueStats()
{
final int queuedUpdates = statusUpdatesInQueue.get();

View File

@ -469,9 +469,15 @@ public class OverlordResource
return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build();
}
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
for (String taskId : taskIds) {
Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId);
final Optional<TaskStatus> optional;
if (taskQueue.isPresent()) {
optional = taskQueue.get().getTaskStatus(taskId);
} else {
optional = taskStorageQueryAdapter.getStatus(taskId);
}
if (optional.isPresent()) {
result.put(taskId, optional.get());
}

View File

@ -27,11 +27,9 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -876,14 +874,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Preconditions.checkNotNull(id, "id");
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
Optional<? extends TaskRunnerWorkItem> item = Iterables.tryFind(
taskRunner.get().getRunningTasks(),
(Predicate<TaskRunnerWorkItem>) taskRunnerWorkItem -> id.equals(taskRunnerWorkItem.getTaskId())
);
if (item.isPresent()) {
return item.get().getLocation();
}
return taskRunner.get().getTaskLocation(id);
} else {
log.error("Failed to get task runner because I'm not the leader!");
}
@ -894,7 +885,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Override
public Optional<TaskStatus> getTaskStatus(String id)
{
return taskStorage.getStatus(id);
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getTaskStatus(id);
} else {
return taskStorage.getStatus(id);
}
}
};

View File

@ -460,6 +460,66 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
}
@Test
public void testGetTaskStatus()
{
final String newTask = "newTask";
final String waitingTask = "waitingTask";
final String pendingTask = "pendingTask";
final String runningTask = "runningTask";
final String successfulTask = "successfulTask";
final String failedTask = "failedTask";
TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
EasyMock.expect(taskStorage.getStatus(newTask))
.andReturn(Optional.of(TaskStatus.running(newTask)));
EasyMock.expect(taskStorage.getStatus(successfulTask))
.andReturn(Optional.of(TaskStatus.success(successfulTask)));
EasyMock.expect(taskStorage.getStatus(failedTask))
.andReturn(Optional.of(TaskStatus.failure(failedTask, failedTask)));
EasyMock.replay(taskStorage);
TaskRunner taskRunner = EasyMock.createMock(HttpRemoteTaskRunner.class);
EasyMock.expect(taskRunner.getRunnerTaskState(newTask))
.andReturn(null);
EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask))
.andReturn(RunnerTaskState.WAITING);
EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask))
.andReturn(RunnerTaskState.PENDING);
EasyMock.expect(taskRunner.getRunnerTaskState(runningTask))
.andReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskRunner.getRunnerTaskState(failedTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskRunner.getTaskLocation(waitingTask))
.andReturn(TaskLocation.unknown());
EasyMock.expect(taskRunner.getTaskLocation(pendingTask))
.andReturn(TaskLocation.unknown());
EasyMock.expect(taskRunner.getTaskLocation(runningTask))
.andReturn(TaskLocation.create("host", 8100, 8100));
EasyMock.replay(taskRunner);
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
taskStorage,
taskRunner,
createActionClientFactory(),
getLockbox(),
new StubServiceEmitter("druid/overlord", "testHost")
);
taskQueue.setActive(true);
Assert.assertEquals(TaskStatus.running(newTask), taskQueue.getTaskStatus(newTask).get());
Assert.assertEquals(TaskStatus.running(waitingTask), taskQueue.getTaskStatus(waitingTask).get());
Assert.assertEquals(TaskStatus.running(pendingTask), taskQueue.getTaskStatus(pendingTask).get());
Assert.assertEquals(TaskStatus.running(runningTask), taskQueue.getTaskStatus(runningTask).get());
Assert.assertEquals(TaskStatus.success(successfulTask), taskQueue.getTaskStatus(successfulTask).get());
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), taskQueue.getTaskStatus(failedTask).get());
}
private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String> runningTasks)
{
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();

View File

@ -1794,6 +1794,82 @@ public class OverlordResourceTest
Assert.assertEquals(expectedResourceActions, resourceActions);
}
@Test
public void testGetMultipleTaskStatuses_presentTaskQueue()
{
// Needed for teardown
EasyMock.replay(
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskQueue.getTaskStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
EasyMock.replay(taskQueue);
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
EasyMock.replay(taskMaster);
OverlordResource overlordResource = new OverlordResource(
taskMaster,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
.getEntity();
Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response);
}
@Test
public void testGetMultipleTaskStatuses_absentTaskQueue()
{
// Needed for teardown
EasyMock.replay(
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class);
EasyMock.expect(taskStorageQueryAdapter.getStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
EasyMock.replay(taskStorageQueryAdapter);
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent());
EasyMock.replay(taskMaster);
OverlordResource overlordResource = new OverlordResource(
taskMaster,
taskStorageQueryAdapter,
null,
null,
null,
null,
null,
null,
null,
null
);
final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
.getEntity();
Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response);
}
private void expectAuthorizationTokenCheck()
{
expectAuthorizationTokenCheck(Users.DRUID);

View File

@ -1101,6 +1101,9 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(location3).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();

View File

@ -19,16 +19,16 @@
package org.apache.druid.rpc.indexing;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.rpc.ServiceLocation;
@ -36,9 +36,11 @@ import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
/**
* Service locator for a specific task. Uses the {@link OverlordClient#taskStatus} API to locate tasks.
* Service locator for a specific task. Uses the {@link OverlordClient#taskStatuses(Set)} API to locate tasks.
*
* This locator has an internal cache that is updated if the last check has been over {@link #LOCATION_CACHE_MS} ago.
*
@ -85,10 +87,10 @@ public class SpecificTaskServiceLocator implements ServiceLocator
} else if (closed || lastKnownState != TaskState.RUNNING) {
return Futures.immediateFuture(ServiceLocations.closed());
} else if (lastKnownLocation == null || lastUpdateTime + LOCATION_CACHE_MS < System.currentTimeMillis()) {
final ListenableFuture<TaskStatusResponse> taskStatusFuture;
final ListenableFuture<Map<String, TaskStatus>> taskStatusFuture;
try {
taskStatusFuture = overlordClient.taskStatus(taskId);
taskStatusFuture = overlordClient.taskStatuses(ImmutableSet.of(taskId));
}
catch (Exception e) {
throw new RuntimeException(e);
@ -110,31 +112,31 @@ public class SpecificTaskServiceLocator implements ServiceLocator
Futures.addCallback(
taskStatusFuture,
new FutureCallback<TaskStatusResponse>()
new FutureCallback<Map<String, TaskStatus>>()
{
@Override
public void onSuccess(final TaskStatusResponse taskStatus)
public void onSuccess(final Map<String, TaskStatus> taskStatusMap)
{
synchronized (lock) {
if (pendingFuture != null) {
lastUpdateTime = System.currentTimeMillis();
final TaskStatusPlus statusPlus = taskStatus.getStatus();
final TaskStatus status = taskStatusMap.get(taskId);
if (statusPlus == null) {
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
lastKnownState = null;
lastKnownLocation = null;
} else {
lastKnownState = statusPlus.getStatusCode();
lastKnownState = status.getStatusCode();
if (TaskLocation.unknown().equals(statusPlus.getLocation())) {
if (TaskLocation.unknown().equals(status.getLocation())) {
lastKnownLocation = null;
} else {
lastKnownLocation = new ServiceLocation(
statusPlus.getLocation().getHost(),
statusPlus.getLocation().getPort(),
statusPlus.getLocation().getTlsPort(),
status.getLocation().getHost(),
status.getLocation().getPort(),
status.getLocation().getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);
}

View File

@ -22,11 +22,9 @@ package org.apache.druid.rpc.indexing;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.ServiceLocations;
@ -43,6 +41,7 @@ import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class SpecificTaskServiceLocatorTest
@ -61,8 +60,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_noLocationYet() throws Exception
{
Mockito.when(overlordClient.taskStatus(TASK_ID))
.thenReturn(makeResponse(TaskState.RUNNING, TaskLocation.unknown()));
Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(status(TaskState.RUNNING, TaskLocation.unknown()));
final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@ -72,8 +71,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskRunning() throws Exception
{
Mockito.when(overlordClient.taskStatus(TASK_ID))
.thenReturn(makeResponse(TaskState.RUNNING, TASK_LOCATION1));
Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(status(TaskState.RUNNING, TASK_LOCATION1));
final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient);
Assert.assertEquals(ServiceLocations.forLocation(SERVICE_LOCATION1), locator.locate().get());
@ -82,8 +81,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskNotFound() throws Exception
{
Mockito.when(overlordClient.taskStatus(TASK_ID))
.thenReturn(Futures.immediateFuture(new TaskStatusResponse(TASK_ID, null)));
Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(Futures.immediateFuture(Collections.singletonMap(TASK_ID, null)));
final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@ -93,8 +92,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskSuccess() throws Exception
{
Mockito.when(overlordClient.taskStatus(TASK_ID))
.thenReturn(makeResponse(TaskState.SUCCESS, TaskLocation.unknown()));
Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(status(TaskState.SUCCESS, TaskLocation.unknown()));
final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@ -104,8 +103,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskFailed() throws Exception
{
Mockito.when(overlordClient.taskStatus(TASK_ID))
.thenReturn(makeResponse(TaskState.FAILED, TaskLocation.unknown()));
Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(status(TaskState.FAILED, TaskLocation.unknown()));
final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@ -115,7 +114,7 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_overlordError()
{
Mockito.when(overlordClient.taskStatus(TASK_ID))
Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(Futures.immediateFailedFuture(new ISE("oh no")));
final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient);
@ -134,8 +133,8 @@ public class SpecificTaskServiceLocatorTest
public void test_locate_afterClose() throws Exception
{
// Overlord call will never return.
final SettableFuture<TaskStatusResponse> overlordFuture = SettableFuture.create();
Mockito.when(overlordClient.taskStatus(TASK_ID))
final SettableFuture<Map<String, TaskStatus>> overlordFuture = SettableFuture.create();
Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(overlordFuture);
final SpecificTaskServiceLocator locator = new SpecificTaskServiceLocator(TASK_ID, overlordClient);
@ -147,26 +146,15 @@ public class SpecificTaskServiceLocatorTest
Assert.assertTrue(overlordFuture.isCancelled());
}
private static ListenableFuture<TaskStatusResponse> makeResponse(final TaskState state, final TaskLocation location)
private static ListenableFuture<Map<String, TaskStatus>> status(final TaskState state, final TaskLocation location)
{
final TaskStatusResponse response = new TaskStatusResponse(
final TaskStatus status = new TaskStatus(
TASK_ID,
new TaskStatusPlus(
TASK_ID,
null,
null,
DateTimes.utc(0),
DateTimes.utc(0),
state,
null,
null,
1L,
location,
null,
null
)
state,
1L,
null,
location
);
return Futures.immediateFuture(response);
return Futures.immediateFuture(Collections.singletonMap(TASK_ID, status));
}
}