Fix issues with null pointers on jobResponse (#14010)

* Fix issues with null pointers on jobResponse

* fix unit tests

* Update extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* nullable

* fix error message

---------

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
This commit is contained in:
George Shiqi Wu 2023-04-04 20:48:18 -04:00 committed by GitHub
parent 7e572eef08
commit f60f377e5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 79 additions and 6 deletions

View File

@ -180,6 +180,11 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
TaskStatus status; TaskStatus status;
if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) { if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) {
status = TaskStatus.success(task.getId()); status = TaskStatus.success(task.getId());
} else if (completedPhase.getJob() == null) {
status = TaskStatus.failure(
task.getId(),
"K8s Job for task disappeared before completion: " + k8sTaskId
);
} else { } else {
status = TaskStatus.failure( status = TaskStatus.failure(
task.getId(), task.getId(),

View File

@ -111,7 +111,8 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
unit unit
); );
if (job == null) { if (job == null) {
return new JobResponse(job, PeonPhase.FAILED); log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId);
return new JobResponse(null, PeonPhase.FAILED);
} }
if (job.getStatus().getSucceeded() != null) { if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED); return new JobResponse(job, PeonPhase.SUCCEEDED);

View File

@ -26,6 +26,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.Period; import org.joda.time.Period;
import org.joda.time.PeriodType; import org.joda.time.PeriodType;
import javax.annotation.Nullable;
public class JobResponse public class JobResponse
{ {
@ -35,7 +37,7 @@ public class JobResponse
private final Job job; private final Job job;
private final PeonPhase phase; private final PeonPhase phase;
public JobResponse(Job job, PeonPhase phase) public JobResponse(@Nullable Job job, PeonPhase phase)
{ {
this.job = job; this.job = job;
this.phase = phase; this.phase = phase;
@ -54,6 +56,7 @@ public class JobResponse
public Optional<Long> getJobDuration() public Optional<Long> getJobDuration()
{ {
Optional<Long> duration = Optional.absent(); Optional<Long> duration = Optional.absent();
String jobName = job != null && job.getMetadata() != null ? job.getMetadata().getName() : "";
try { try {
if (job != null && job.getStatus() != null if (job != null && job.getStatus() != null
&& job.getStatus().getStartTime() != null && job.getStatus().getStartTime() != null
@ -66,12 +69,12 @@ public class JobResponse
} }
} }
catch (Exception e) { catch (Exception e) {
LOGGER.error(e, "Error calculating duration for job: %s", job.getMetadata().getName()); LOGGER.error(e, "Error calculating duration for job: %s", jobName);
} }
if (duration.isPresent()) { if (duration.isPresent()) {
LOGGER.info("Duration for Job: %s was %d seconds", job.getMetadata().getName(), duration.get()); LOGGER.info("Duration for Job: %s was %d seconds", jobName, duration.get());
} else { } else {
LOGGER.info("Unable to calcuate duration for Job: %s", job.getMetadata().getName()); LOGGER.info("Unable to calcuate duration for Job: %s", jobName);
} }
return duration; return duration;
} }

View File

@ -38,6 +38,7 @@ import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.guice.FirehoseModule; import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.IndexTask;
@ -694,6 +695,62 @@ public class KubernetesTaskRunnerTest
assertEquals(TaskLocation.unknown(), location); assertEquals(TaskLocation.unknown(), location);
} }
@Test
public void testK8sJobManualShutdown() throws Exception
{
Task task = makeTask();
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
Job job = mock(Job.class);
ObjectMeta jobMetadata = mock(ObjectMeta.class);
when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
JobStatus status = mock(JobStatus.class);
when(status.getActive()).thenReturn(1);
when(job.getStatus()).thenReturn(status);
when(job.getMetadata()).thenReturn(jobMetadata);
Pod peonPod = mock(Pod.class);
ObjectMeta metadata = mock(ObjectMeta.class);
when(metadata.getName()).thenReturn("peonPodName");
when(metadata.getCreationTimestamp()).thenReturn(DateTimes.nowUtc().toString());
when(peonPod.getMetadata()).thenReturn(metadata);
PodStatus podStatus = mock(PodStatus.class);
when(podStatus.getPodIP()).thenReturn("SomeIP");
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
// Client returns a null job if the job has been deleted
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
null,
PeonPhase.FAILED
));
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
null
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
ListenableFuture<TaskStatus> future = spyRunner.run(task);
TaskStatus taskStatus = future.get();
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
Assert.assertEquals("K8s Job for task disappeared before completion: [ k8sTaskId, k8staskid]", taskStatus.getErrorMsg());
}
private Task makeTask() private Task makeTask()
{ {
return new TestableNoopTask( return new TestableNoopTask(
@ -711,7 +768,6 @@ public class KubernetesTaskRunnerTest
) )
); );
} }
private static class TestableNoopTask extends NoopTask private static class TestableNoopTask extends NoopTask
{ {
TestableNoopTask( TestableNoopTask(

View File

@ -75,4 +75,12 @@ class JobResponseTest
Optional<Long> duration = response.getJobDuration(); Optional<Long> duration = response.getJobDuration();
Assertions.assertFalse(duration.isPresent()); Assertions.assertFalse(duration.isPresent());
} }
@Test
void testNullJob()
{
JobResponse response = new JobResponse(null, PeonPhase.SUCCEEDED);
Optional<Long> duration = response.getJobDuration();
Assertions.assertFalse(duration.isPresent());
}
} }