Support getTaskLocation for mixed task runner (#15033)

The KubernetesAndWorkerTaskRunner currently doesn't implement getTaskLocation, so tasks run by it will show a unknown TaskLocation in the druid console after a task has completed.

Fix bug in KubernetesAndWorkerTaskRunner that manifests as missing information in the druid Web Console.
This commit is contained in:
George Shiqi Wu 2023-09-26 23:27:36 -04:00 committed by GitHub
parent 3dabfead05
commit 8e22a178cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 0 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
@ -232,6 +233,16 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas
return Optional.absent();
}
@Override
public TaskLocation getTaskLocation(String taskId)
{
TaskLocation taskLocation = kubernetesTaskRunner.getTaskLocation(taskId);
if (taskLocation == null || taskLocation.equals(TaskLocation.unknown())) {
return workerTaskRunner.getTaskLocation(taskId);
}
return taskLocation;
}
@Nullable
@Override
public RunnerTaskState getRunnerTaskState(String taskId)
@ -265,4 +276,17 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas
}
return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
}
// Worker task runners do not implement these methods
@Override
public void updateStatus(Task task, TaskStatus status)
{
kubernetesTaskRunner.updateStatus(task, status);
}
@Override
public void updateLocation(Task task, TaskLocation location)
{
kubernetesTaskRunner.updateLocation(task, location);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@ -300,4 +301,44 @@ public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
Assert.assertEquals(0, runner.restore().size());
verifyAll();
}
@Test
public void test_getTaskLocation_kubernetes()
{
TaskLocation kubernetesTaskLocation = TaskLocation.create("host", 0, 0, false);
EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(kubernetesTaskLocation);
replayAll();
Assert.assertEquals(kubernetesTaskLocation, runner.getTaskLocation(ID));
verifyAll();
}
@Test
public void test_getTaskLocation_worker()
{
TaskLocation workerTaskLocation = TaskLocation.create("host", 0, 0, false);
EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(TaskLocation.unknown());
EasyMock.expect(workerTaskRunner.getTaskLocation(ID)).andReturn(workerTaskLocation);
replayAll();
Assert.assertEquals(workerTaskLocation, runner.getTaskLocation(ID));
verifyAll();
}
@Test
public void test_updateStatus()
{
kubernetesTaskRunner.updateStatus(task, TaskStatus.running(ID));
replayAll();
runner.updateStatus(task, TaskStatus.running(ID));
verifyAll();
}
@Test
public void test_updateLocation()
{
kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown());
replayAll();
runner.updateLocation(task, TaskLocation.unknown());
verifyAll();
}
}