Kubernetes task runner live reports (#13986)

Implement Live Reports for the KubernetesTaskRunner
This commit is contained in:
Nicholas Lippis 2023-03-30 01:00:22 -04:00 committed by GitHub
parent 44abe2b96f
commit 61a35262ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 323 additions and 11 deletions

View File

@ -79,6 +79,12 @@
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>${netty3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>

View File

@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
@ -46,6 +47,9 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
@ -55,11 +59,13 @@ import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@ -69,6 +75,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -99,6 +106,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final TaskLogPusher taskLogPusher;
private final ListeningExecutorService exec;
private final KubernetesPeonClient client;
private final HttpClient httpClient;
public KubernetesTaskRunner(
@ -106,7 +114,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
KubernetesTaskRunnerConfig k8sConfig,
TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
KubernetesPeonClient client
KubernetesPeonClient client,
HttpClient httpClient
)
{
this.adapter = adapter;
@ -114,6 +123,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.client = client;
this.httpClient = httpClient;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
@ -245,11 +255,41 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
client.cleanUpJob(new K8sTaskId(taskid));
}
@Override
public Optional<InputStream> streamTaskReports(String taskid)
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
return Optional.absent();
final K8sWorkItem workItem = tasks.get(taskid);
if (workItem == null) {
return Optional.absent();
}
final TaskLocation taskLocation = workItem.getLocation();
if (TaskLocation.unknown().equals(taskLocation)) {
// No location known for this task. It may have not been assigned one yet.
return Optional.absent();
}
final URL url = TaskRunnerUtils.makeTaskLocationURL(
taskLocation,
"/druid/worker/v1/chat/%s/liveReports",
taskid
);
try {
return Optional.of(httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
@Override

View File

@ -24,12 +24,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
@ -47,6 +49,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
{
public static final String TYPE_NAME = "k8s";
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private final StartupLoggingConfig startupLoggingConfig;
private final TaskQueueConfig taskQueueConfig;
@ -60,6 +63,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
@Inject
public KubernetesTaskRunnerFactory(
@Smile ObjectMapper smileMapper,
@EscalatedGlobal final HttpClient httpClient,
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
StartupLoggingConfig startupLoggingConfig,
@JacksonInject TaskQueueConfig taskQueueConfig,
@ -71,6 +75,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
{
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
this.startupLoggingConfig = startupLoggingConfig;
this.taskQueueConfig = taskQueueConfig;
@ -98,7 +103,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs)
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
httpClient
);
return runner;
}

View File

@ -97,6 +97,7 @@ public class KubernetesTaskRunnerFactoryTest
{
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
@ -117,6 +118,7 @@ public class KubernetesTaskRunnerFactoryTest
{
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
@ -139,6 +141,7 @@ public class KubernetesTaskRunnerFactoryTest
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
@ -162,6 +165,7 @@ public class KubernetesTaskRunnerFactoryTest
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
@ -187,6 +191,7 @@ public class KubernetesTaskRunnerFactoryTest
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
@ -213,6 +218,7 @@ public class KubernetesTaskRunnerFactoryTest
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
@ -236,6 +242,7 @@ public class KubernetesTaskRunnerFactoryTest
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
@ -262,6 +269,7 @@ public class KubernetesTaskRunnerFactoryTest
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
null,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,

View File

@ -26,12 +26,14 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexer.RunnerTaskState;
@ -46,6 +48,9 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.DruidKubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.JobResponse;
@ -58,12 +63,16 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -151,7 +160,8 @@ public class KubernetesTaskRunnerTest
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@ -205,7 +215,8 @@ public class KubernetesTaskRunnerTest
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@ -270,7 +281,8 @@ public class KubernetesTaskRunnerTest
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems = spyRunner.getKnownTasks();
@ -332,7 +344,8 @@ public class KubernetesTaskRunnerTest
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems = spyRunner.getKnownTasks();
@ -356,6 +369,243 @@ public class KubernetesTaskRunnerTest
verify(spyRunner, times(1)).run(eq(task));
}
@Test
public void testStreamTaskReports() throws Exception
{
Task task = makeTask();
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
Job job = mock(Job.class);
ObjectMeta jobMetadata = mock(ObjectMeta.class);
when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
JobStatus status = mock(JobStatus.class);
when(status.getActive()).thenReturn(1).thenReturn(null);
when(job.getStatus()).thenReturn(status);
when(job.getMetadata()).thenReturn(jobMetadata);
Pod peonPod = mock(Pod.class);
ObjectMeta metadata = mock(ObjectMeta.class);
when(metadata.getName()).thenReturn("peonPodName");
when(peonPod.getMetadata()).thenReturn(metadata);
PodStatus podStatus = mock(PodStatus.class);
when(podStatus.getPodIP()).thenReturn("SomeIP");
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
HttpClient httpClient = mock(HttpClient.class);
when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn(
Futures.immediateFuture(IOUtils.toInputStream("{}", StandardCharsets.UTF_8))
);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
httpClient
);
taskRunner.run(task);
Optional<InputStream> maybeInputStream = taskRunner.streamTaskReports(task.getId());
Assert.assertTrue(maybeInputStream.isPresent());
InputStream report = maybeInputStream.get();
Assert.assertEquals("{}", IOUtils.toString(report, StandardCharsets.UTF_8));
}
@Test
public void testStreamTaskReports_whereJobDoesNotExist_returnsEmptyOptional() throws Exception
{
Task task = makeTask();
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
mock(DruidKubernetesPeonClient.class),
mock(HttpClient.class)
);
Optional<InputStream> maybeInputStream = taskRunner.streamTaskReports(task.getId());
Assert.assertFalse(maybeInputStream.isPresent());
}
@Test
public void testStreamTaskReports_withoutEmptyLocation_returnsEmptyOptional() throws Exception
{
Task task = makeTask();
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
Job job = mock(Job.class);
ObjectMeta jobMetadata = mock(ObjectMeta.class);
when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
JobStatus status = mock(JobStatus.class);
when(status.getActive()).thenReturn(1).thenReturn(null);
when(job.getStatus()).thenReturn(status);
when(job.getMetadata()).thenReturn(jobMetadata);
Pod peonPod = mock(Pod.class);
ObjectMeta metadata = mock(ObjectMeta.class);
when(metadata.getName()).thenReturn("peonPodName");
when(peonPod.getMetadata()).thenReturn(metadata);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
HttpClient httpClient = mock(HttpClient.class);
ListenableFuture<InputStream> future = mock(ListenableFuture.class);
when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn(future);
when(future.get()).thenThrow(InterruptedException.class);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
httpClient
);
taskRunner.run(task);
Assert.assertFalse(taskRunner.streamTaskReports(task.getId()).isPresent());
}
@Test
public void testStreamTaskReports_getInputStreamThrowsInterruptedException_throwsRuntimeException() throws Exception
{
Task task = makeTask();
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
Job job = mock(Job.class);
ObjectMeta jobMetadata = mock(ObjectMeta.class);
when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
JobStatus status = mock(JobStatus.class);
when(status.getActive()).thenReturn(1).thenReturn(null);
when(job.getStatus()).thenReturn(status);
when(job.getMetadata()).thenReturn(jobMetadata);
Pod peonPod = mock(Pod.class);
ObjectMeta metadata = mock(ObjectMeta.class);
when(metadata.getName()).thenReturn("peonPodName");
when(peonPod.getMetadata()).thenReturn(metadata);
PodStatus podStatus = mock(PodStatus.class);
when(podStatus.getPodIP()).thenReturn("SomeIP");
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
HttpClient httpClient = mock(HttpClient.class);
ListenableFuture<InputStream> future = mock(ListenableFuture.class);
when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn(future);
when(future.get()).thenThrow(InterruptedException.class);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
httpClient
);
taskRunner.run(task);
Assert.assertThrows(RuntimeException.class, () -> taskRunner.streamTaskReports(task.getId()));
}
@Test
public void testStreamTaskReports_getInputStreamThrowsExecutionException_throwsRuntimeException() throws Exception
{
Task task = makeTask();
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
Job job = mock(Job.class);
ObjectMeta jobMetadata = mock(ObjectMeta.class);
when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
JobStatus status = mock(JobStatus.class);
when(status.getActive()).thenReturn(1).thenReturn(null);
when(job.getStatus()).thenReturn(status);
when(job.getMetadata()).thenReturn(jobMetadata);
Pod peonPod = mock(Pod.class);
ObjectMeta metadata = mock(ObjectMeta.class);
when(metadata.getName()).thenReturn("peonPodName");
when(peonPod.getMetadata()).thenReturn(metadata);
PodStatus podStatus = mock(PodStatus.class);
when(podStatus.getPodIP()).thenReturn("SomeIP");
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
job,
PeonPhase.SUCCEEDED
));
HttpClient httpClient = mock(HttpClient.class);
ListenableFuture<InputStream> future = mock(ListenableFuture.class);
when(httpClient.go(isA(Request.class), isA(InputStreamResponseHandler.class))).thenReturn(future);
when(future.get()).thenThrow(ExecutionException.class);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
httpClient
);
taskRunner.run(task);
Assert.assertThrows(RuntimeException.class, () -> taskRunner.streamTaskReports(task.getId()));
}
@Test
public void testMakingCodeCoverageHappy()
{
@ -372,7 +622,8 @@ public class KubernetesTaskRunnerTest
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient
peonClient,
null
);
RunnerTaskState state = taskRunner.getRunnerTaskState("foo");
@ -399,7 +650,8 @@ public class KubernetesTaskRunnerTest
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
mock(DruidKubernetesPeonClient.class)
mock(DruidKubernetesPeonClient.class),
null
));
}