Fix bug in k8s task runner in handling deleted jobs (#14001)

With the KubernetesTaskRunner, if a task is manually shutdown via the web console while running or the corresponding k8s job is manually deleted, the thread responsible for overseeing the task gets stuck in a loop because the fabric8 client sends one event to it that the job is null when the job is deleted, but this doesn't pass the condition.

This means that the thread is stuck waiting on a fabric8 event (the job being successful) that will never come up until maxTaskDuration (default 4 hours). If a user of the extension is trying to use a limited taskqueue maxSize, this can cause problems as the k8s executor pool is unable to pick up additional tasks (since threads are stuck waiting on the old tasks that have already been deleted).
This commit is contained in:
George Shiqi Wu 2023-03-30 00:39:52 -04:00 committed by GitHub
parent 3bb67721f7
commit 44abe2b96f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 6 deletions

View File

@ -106,10 +106,13 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.waitUntilCondition(
x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null,
x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null),
howLong,
unit
);
if (job == null) {
return new JobResponse(job, PeonPhase.FAILED);
}
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED);
}

View File

@ -55,7 +55,7 @@ public class JobResponse
{
Optional<Long> duration = Optional.absent();
try {
if (job.getStatus() != null
if (job != null && job.getStatus() != null
&& job.getStatus().getStartTime() != null
&& job.getStatus().getCompletionTime() != null) {
duration = Optional.of((long) new Period(

View File

@ -30,8 +30,8 @@ import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.Assertions;
@ -60,9 +60,27 @@ public class DruidKubernetesPeonClientTest
DruidKubernetesPeonClient client = new DruidKubernetesPeonClient(new TestKubernetesClient(this.client), "test",
false
);
Assertions.assertThrows(KubernetesClientTimeoutException.class, () -> {
client.waitForJobCompletion(new K8sTaskId("some-task"), 1, TimeUnit.SECONDS);
});
JobResponse jobResponse = client.waitForJobCompletion(new K8sTaskId("some-task"), 1, TimeUnit.SECONDS);
Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase());
Assertions.assertNull(jobResponse.getJob());
}
@Test
void testWaitingForAPodToGetReadySuccess()
{
DruidKubernetesPeonClient peonClient = new DruidKubernetesPeonClient(new TestKubernetesClient(this.client), "test",
false
);
Job job = new JobBuilder()
.withNewMetadata()
.withName("sometask")
.endMetadata()
.withStatus(new JobStatusBuilder().withActive(null).withSucceeded(1).build())
.build();
client.batch().v1().jobs().inNamespace("test").create(job);
JobResponse jobResponse = peonClient.waitForJobCompletion(new K8sTaskId("sometask"), 1, TimeUnit.SECONDS);
Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase());
Assertions.assertEquals(job.getStatus().getSucceeded(), jobResponse.getJob().getStatus().getSucceeded());
}
@Test