From 61a35262ec0afac578fd0a39f87a695791644225 Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Thu, 30 Mar 2023 01:00:22 -0400 Subject: [PATCH] Kubernetes task runner live reports (#13986) Implement Live Reports for the KubernetesTaskRunner --- .../kubernetes-overlord-extensions/pom.xml | 6 + .../k8s/overlord/KubernetesTaskRunner.java | 48 +++- .../overlord/KubernetesTaskRunnerFactory.java | 8 +- .../KubernetesTaskRunnerFactoryTest.java | 8 + .../overlord/KubernetesTaskRunnerTest.java | 264 +++++++++++++++++- 5 files changed, 323 insertions(+), 11 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/pom.xml b/extensions-contrib/kubernetes-overlord-extensions/pom.xml index f2a775ec2db..ef58f261ab0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/pom.xml +++ b/extensions-contrib/kubernetes-overlord-extensions/pom.xml @@ -79,6 +79,12 @@ commons-lang3 provided + + io.netty + netty + ${netty3.version} + provided + io.netty netty-common diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 7f9e92a9fa3..5b76329cdde 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -46,6 +47,9 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; @@ -55,11 +59,13 @@ import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.k8s.overlord.common.TaskAdapter; import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogStreamer; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -69,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -99,6 +106,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final TaskLogPusher taskLogPusher; private final ListeningExecutorService exec; private final KubernetesPeonClient client; + private final HttpClient httpClient; public KubernetesTaskRunner( @@ -106,7 +114,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner KubernetesTaskRunnerConfig k8sConfig, TaskQueueConfig taskQueueConfig, TaskLogPusher taskLogPusher, - KubernetesPeonClient client + KubernetesPeonClient client, + HttpClient httpClient ) { this.adapter = adapter; @@ -114,6 +123,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner this.taskQueueConfig = taskQueueConfig; this.taskLogPusher = taskLogPusher; this.client = client; + this.httpClient = httpClient; this.cleanupExecutor = Executors.newScheduledThreadPool(1); this.exec = MoreExecutors.listeningDecorator( Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d") @@ -245,11 +255,41 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner client.cleanUpJob(new K8sTaskId(taskid)); } - @Override - public Optional streamTaskReports(String taskid) + public Optional streamTaskReports(String taskid) throws IOException { - return Optional.absent(); + final K8sWorkItem workItem = tasks.get(taskid); + if (workItem == null) { + return Optional.absent(); + } + + final TaskLocation taskLocation = workItem.getLocation(); + + if (TaskLocation.unknown().equals(taskLocation)) { + // No location known for this task. It may have not been assigned one yet. + return Optional.absent(); + } + + final URL url = TaskRunnerUtils.makeTaskLocationURL( + taskLocation, + "/druid/worker/v1/chat/%s/liveReports", + taskid + ); + + try { + return Optional.of(httpClient.go( + new Request(HttpMethod.GET, url), + new InputStreamResponseHandler() + ).get()); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { + // Unwrap if possible + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new RuntimeException(e); + } } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index e86e1700aaa..cae72372317 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -24,12 +24,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import io.fabric8.kubernetes.client.Config; import org.apache.druid.guice.IndexingServiceModuleHelper; +import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesPeonClient; import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter; @@ -47,6 +49,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory workItems = spyRunner.getKnownTasks(); @@ -332,7 +344,8 @@ public class KubernetesTaskRunnerTest kubernetesTaskRunnerConfig, taskQueueConfig, taskLogPusher, - peonClient + peonClient, + null ); KubernetesTaskRunner spyRunner = spy(taskRunner); Collection workItems = spyRunner.getKnownTasks(); @@ -356,6 +369,243 @@ public class KubernetesTaskRunnerTest verify(spyRunner, times(1)).run(eq(task)); } + @Test + public void testStreamTaskReports() throws Exception + { + Task task = makeTask(); + K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); + + Job job = mock(Job.class); + ObjectMeta jobMetadata = mock(ObjectMeta.class); + when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId()); + JobStatus status = mock(JobStatus.class); + when(status.getActive()).thenReturn(1).thenReturn(null); + when(job.getStatus()).thenReturn(status); + when(job.getMetadata()).thenReturn(jobMetadata); + + Pod peonPod = mock(Pod.class); + ObjectMeta metadata = mock(ObjectMeta.class); + when(metadata.getName()).thenReturn("peonPodName"); + when(peonPod.getMetadata()).thenReturn(metadata); + PodStatus podStatus = mock(PodStatus.class); + when(podStatus.getPodIP()).thenReturn("SomeIP"); + when(peonPod.getStatus()).thenReturn(podStatus); + + K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); + when(adapter.fromTask(eq(task))).thenReturn(job); + + DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null)); + when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); + when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); + when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse( + job, + PeonPhase.SUCCEEDED + )); + + HttpClient httpClient = mock(HttpClient.class); + when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn( + Futures.immediateFuture(IOUtils.toInputStream("{}", StandardCharsets.UTF_8)) + ); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + adapter, + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogPusher, + peonClient, + httpClient + ); + taskRunner.run(task); + Optional maybeInputStream = taskRunner.streamTaskReports(task.getId()); + + Assert.assertTrue(maybeInputStream.isPresent()); + InputStream report = maybeInputStream.get(); + Assert.assertEquals("{}", IOUtils.toString(report, StandardCharsets.UTF_8)); + } + + @Test + public void testStreamTaskReports_whereJobDoesNotExist_returnsEmptyOptional() throws Exception + { + Task task = makeTask(); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + mock(K8sTaskAdapter.class), + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogPusher, + mock(DruidKubernetesPeonClient.class), + mock(HttpClient.class) + ); + + Optional maybeInputStream = taskRunner.streamTaskReports(task.getId()); + + Assert.assertFalse(maybeInputStream.isPresent()); + } + + @Test + public void testStreamTaskReports_withoutEmptyLocation_returnsEmptyOptional() throws Exception + { + Task task = makeTask(); + K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); + + Job job = mock(Job.class); + ObjectMeta jobMetadata = mock(ObjectMeta.class); + when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId()); + JobStatus status = mock(JobStatus.class); + when(status.getActive()).thenReturn(1).thenReturn(null); + when(job.getStatus()).thenReturn(status); + when(job.getMetadata()).thenReturn(jobMetadata); + + Pod peonPod = mock(Pod.class); + ObjectMeta metadata = mock(ObjectMeta.class); + when(metadata.getName()).thenReturn("peonPodName"); + when(peonPod.getMetadata()).thenReturn(metadata); + + K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); + when(adapter.fromTask(eq(task))).thenReturn(job); + + DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null)); + when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); + when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); + when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse( + job, + PeonPhase.SUCCEEDED + )); + + HttpClient httpClient = mock(HttpClient.class); + ListenableFuture future = mock(ListenableFuture.class); + + when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn(future); + when(future.get()).thenThrow(InterruptedException.class); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + adapter, + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogPusher, + peonClient, + httpClient + ); + + taskRunner.run(task); + + Assert.assertFalse(taskRunner.streamTaskReports(task.getId()).isPresent()); + } + + @Test + public void testStreamTaskReports_getInputStreamThrowsInterruptedException_throwsRuntimeException() throws Exception + { + Task task = makeTask(); + K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); + + Job job = mock(Job.class); + ObjectMeta jobMetadata = mock(ObjectMeta.class); + when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId()); + JobStatus status = mock(JobStatus.class); + when(status.getActive()).thenReturn(1).thenReturn(null); + when(job.getStatus()).thenReturn(status); + when(job.getMetadata()).thenReturn(jobMetadata); + + Pod peonPod = mock(Pod.class); + ObjectMeta metadata = mock(ObjectMeta.class); + when(metadata.getName()).thenReturn("peonPodName"); + when(peonPod.getMetadata()).thenReturn(metadata); + PodStatus podStatus = mock(PodStatus.class); + when(podStatus.getPodIP()).thenReturn("SomeIP"); + when(peonPod.getStatus()).thenReturn(podStatus); + + K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); + when(adapter.fromTask(eq(task))).thenReturn(job); + + DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null)); + when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); + when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); + when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse( + job, + PeonPhase.SUCCEEDED + )); + + HttpClient httpClient = mock(HttpClient.class); + ListenableFuture future = mock(ListenableFuture.class); + + when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn(future); + when(future.get()).thenThrow(InterruptedException.class); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + adapter, + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogPusher, + peonClient, + httpClient + ); + + taskRunner.run(task); + + Assert.assertThrows(RuntimeException.class, () -> taskRunner.streamTaskReports(task.getId())); + } + + @Test + public void testStreamTaskReports_getInputStreamThrowsExecutionException_throwsRuntimeException() throws Exception + { + Task task = makeTask(); + K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); + + Job job = mock(Job.class); + ObjectMeta jobMetadata = mock(ObjectMeta.class); + when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId()); + JobStatus status = mock(JobStatus.class); + when(status.getActive()).thenReturn(1).thenReturn(null); + when(job.getStatus()).thenReturn(status); + when(job.getMetadata()).thenReturn(jobMetadata); + + Pod peonPod = mock(Pod.class); + ObjectMeta metadata = mock(ObjectMeta.class); + when(metadata.getName()).thenReturn("peonPodName"); + when(peonPod.getMetadata()).thenReturn(metadata); + PodStatus podStatus = mock(PodStatus.class); + when(podStatus.getPodIP()).thenReturn("SomeIP"); + when(peonPod.getStatus()).thenReturn(podStatus); + + K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); + when(adapter.fromTask(eq(task))).thenReturn(job); + + DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null)); + when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); + when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); + when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse( + job, + PeonPhase.SUCCEEDED + )); + + HttpClient httpClient = mock(HttpClient.class); + ListenableFuture future = mock(ListenableFuture.class); + + when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn(future); + when(future.get()).thenThrow(ExecutionException.class); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + adapter, + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogPusher, + peonClient, + httpClient + ); + + taskRunner.run(task); + + Assert.assertThrows(RuntimeException.class, () -> taskRunner.streamTaskReports(task.getId())); + } + @Test public void testMakingCodeCoverageHappy() { @@ -372,7 +622,8 @@ public class KubernetesTaskRunnerTest kubernetesTaskRunnerConfig, taskQueueConfig, taskLogPusher, - peonClient + peonClient, + null ); RunnerTaskState state = taskRunner.getRunnerTaskState("foo"); @@ -399,7 +650,8 @@ public class KubernetesTaskRunnerTest kubernetesTaskRunnerConfig, taskQueueConfig, taskLogPusher, - mock(DruidKubernetesPeonClient.class) + mock(DruidKubernetesPeonClient.class), + null )); }