Fixing the K8s task runner to work with MSQ (#13305)

* Fixing the K8s task runner to work with MSQ

* Sorry incomplete PR

Co-authored-by: Rahul Gidwani <r_gidwani@apple.com>
This commit is contained in:
Churro 2022-11-08 01:11:05 -08:00 committed by GitHub
parent a28b8c2674
commit 9a684af3c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 9 deletions

View File

@ -155,6 +155,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
synchronized (tasks) { synchronized (tasks) {
tasks.computeIfAbsent( tasks.computeIfAbsent(
task.getId(), k -> new K8sWorkItem( task.getId(), k -> new K8sWorkItem(
client,
task, task,
exec.submit(() -> { exec.submit(() -> {
K8sTaskId k8sTaskId = new K8sTaskId(task); K8sTaskId k8sTaskId = new K8sTaskId(task);
@ -375,7 +376,12 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
try { try {
Task task = adapter.toTask(existingTask); Task task = adapter.toTask(existingTask);
ListenableFuture<TaskStatus> future = run(task); ListenableFuture<TaskStatus> future = run(task);
result.add(new K8sWorkItem(task, future, DateTimes.of(existingTask.getMetadata().getCreationTimestamp()))); result.add(new K8sWorkItem(
client,
task,
future,
DateTimes.of(existingTask.getMetadata().getCreationTimestamp())
));
} }
catch (IOException e) { catch (IOException e) {
log.error("Error deserializing task from pod: " + existingTask.getMetadata().getName()); log.error("Error deserializing task from pod: " + existingTask.getMetadata().getName());
@ -457,7 +463,12 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
try { try {
Task task = adapter.toTask(existingTask); Task task = adapter.toTask(existingTask);
ListenableFuture<TaskStatus> future = run(task); ListenableFuture<TaskStatus> future = run(task);
result.add(new K8sWorkItem(task, future, DateTime.parse(existingTask.getMetadata().getCreationTimestamp()))); result.add(new K8sWorkItem(
client,
task,
future,
DateTime.parse(existingTask.getMetadata().getCreationTimestamp())
));
} }
catch (IOException e) { catch (IOException e) {
log.error("Error deserializing task from pod: " + existingTask.getMetadata().getName()); log.error("Error deserializing task from pod: " + existingTask.getMetadata().getName());
@ -494,20 +505,28 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
} }
} }
public class K8sWorkItem extends TaskRunnerWorkItem public static class K8sWorkItem extends TaskRunnerWorkItem
{ {
private final Task task; private final Task task;
private KubernetesPeonClient client;
private K8sWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture) public K8sWorkItem(KubernetesPeonClient client, Task task, ListenableFuture<TaskStatus> statusFuture)
{ {
super(task.getId(), statusFuture); super(task.getId(), statusFuture);
this.task = task; this.task = task;
this.client = client;
} }
private K8sWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture, DateTime createdTime) public K8sWorkItem(
KubernetesPeonClient client,
Task task,
ListenableFuture<TaskStatus> statusFuture,
DateTime createdTime
)
{ {
super(task.getId(), statusFuture, createdTime, createdTime); super(task.getId(), statusFuture, createdTime, createdTime);
this.task = task; this.task = task;
this.client = client;
} }
@Override @Override
@ -516,6 +535,9 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
K8sTaskId taskId = new K8sTaskId(task.getId()); K8sTaskId taskId = new K8sTaskId(task.getId());
try { try {
Pod mainPod = client.getMainJobPod(new K8sTaskId(task.getId())); Pod mainPod = client.getMainJobPod(new K8sTaskId(task.getId()));
if (mainPod.getStatus() == null || mainPod.getStatus().getPodIP() == null) {
return TaskLocation.unknown();
}
boolean tlsEnabled = Boolean.parseBoolean( boolean tlsEnabled = Boolean.parseBoolean(
mainPod.getMetadata() mainPod.getMetadata()
.getAnnotations() .getAnnotations()

View File

@ -52,6 +52,7 @@ import org.apache.druid.k8s.overlord.common.DruidKubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskAdapter; import org.apache.druid.k8s.overlord.common.K8sTaskAdapter;
import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.PeonCommandContext; import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
@ -161,7 +162,10 @@ class KubernetesTaskRunnerTest
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
@ -214,7 +218,10 @@ class KubernetesTaskRunnerTest
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
@ -278,7 +285,10 @@ class KubernetesTaskRunnerTest
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
@ -339,7 +349,10 @@ class KubernetesTaskRunnerTest
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
@ -427,6 +440,31 @@ class KubernetesTaskRunnerTest
)); ));
} }
@Test
void testWorkItemGetLocation()
{
KubernetesPeonClient client = mock(KubernetesPeonClient.class);
Pod pod = mock(Pod.class);
PodStatus status = mock(PodStatus.class);
when(status.getPodIP()).thenReturn(null).thenReturn("tweak");
when(pod.getStatus()).thenReturn(status);
ObjectMeta metadata = mock(ObjectMeta.class);
when(metadata.getAnnotations()).thenReturn(ImmutableMap.of(DruidK8sConstants.TLS_ENABLED, "false"));
when(pod.getMetadata()).thenReturn(metadata);
when(client.getMainJobPod(any())).thenReturn(pod);
Task task = mock(Task.class);
when(task.getId()).thenReturn("butters");
KubernetesTaskRunner.K8sWorkItem k8sWorkItem = new KubernetesTaskRunner.K8sWorkItem(client, task, null);
TaskLocation location = k8sWorkItem.getLocation();
assertEquals(TaskLocation.unknown(), location);
TaskLocation realLocation = k8sWorkItem.getLocation();
assertEquals(TaskLocation.create("tweak", DruidK8sConstants.PORT, DruidK8sConstants.TLS_PORT, false), realLocation);
}
private Task makeTask() private Task makeTask()
{ {
return new TestableNoopTask( return new TestableNoopTask(