Fix race condition in KubernetesTaskRunner between shutdown and getKnownTasks (#14030)

* Fix issues with null pointers on jobResponse

* fix unit tests

* Update extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* nullable

* fix error message

* Use jobs for known tasks instead of pods

* Remove log lines

* remove log lines

* PR change requests

* revert wait change

---------

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
This commit is contained in:
George Shiqi Wu 2023-04-10 16:27:49 -04:00 committed by GitHub
parent 2e87b5a901
commit 00d777d848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 197 additions and 129 deletions

View File

@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -52,6 +51,7 @@ import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.JobStatus;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
@ -80,6 +80,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Runs tasks as k8s jobs using the "internal peon" verb.
@ -339,7 +340,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
List<TaskRunnerWorkItem> result = new ArrayList<>();
for (Pod existingTask : client.listPeonPods()) {
for (Job existingTask : client.listAllPeonJobs()) {
try {
Task task = adapter.toTask(existingTask);
ListenableFuture<TaskStatus> future = run(task);
@ -426,7 +427,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
List<TaskRunnerWorkItem> result = new ArrayList<>();
for (Pod existingTask : client.listPeonPods(Sets.newHashSet(PeonPhase.RUNNING))) {
for (Job existingTask : client.listAllPeonJobs().stream().filter(JobStatus::isActive).collect(Collectors.toSet())) {
try {
Task task = adapter.toTask(existingTask);
ListenableFuture<TaskStatus> future = run(task);

View File

@ -33,10 +33,8 @@ import java.io.InputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class DruidKubernetesPeonClient implements KubernetesPeonClient
{
@ -181,23 +179,6 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
.getItems());
}
@Override
public List<Pod> listPeonPods(Set<PeonPhase> phases)
{
return listPeonPods().stream()
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList());
}
@Override
public List<Pod> listPeonPods()
{
return clientApi.executeRequest(client -> client.pods().inNamespace(namespace)
.withLabel(DruidK8sConstants.LABEL_KEY)
.list().getItems());
}
@Override
public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit)
{

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
public class JobStatus
{
public static boolean isActive(Job job)
{
if (job == null || job.getStatus() == null || job.getStatus().getActive() == null) {
return false;
}
return job.getStatus().getActive() > 0;
}
public static boolean isSucceeded(Job job)
{
if (job == null || job.getStatus() == null || job.getStatus().getSucceeded() == null) {
return false;
}
return job.getStatus().getSucceeded() > 0;
}
public static boolean isFailed(Job job)
{
if (job == null || job.getStatus() == null || job.getStatus().getFailed() == null) {
return false;
}
return job.getStatus().getFailed() > 0;
}
}

View File

@ -118,10 +118,9 @@ public abstract class K8sTaskAdapter implements TaskAdapter
}
@Override
public Task toTask(Pod from) throws IOException
public Task toTask(Job from) throws IOException
{
// all i have to do here is grab the main container...done
PodSpec podSpec = from.getSpec();
PodSpec podSpec = from.getSpec().getTemplate().getSpec();
massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
Optional<EnvVar> taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();

View File

@ -25,7 +25,6 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -48,10 +47,6 @@ public interface KubernetesPeonClient
List<Job> listAllPeonJobs();
List<Pod> listPeonPods(Set<PeonPhase> phases);
List<Pod> listPeonPods();
int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit);
Pod getMainJobPod(K8sTaskId taskId);

View File

@ -155,15 +155,15 @@ public class PodTemplateTaskAdapter implements TaskAdapter
* @throws IOException
*/
@Override
public Task toTask(Pod from) throws IOException
public Task toTask(Job from) throws IOException
{
Map<String, String> annotations = from.getMetadata().getAnnotations();
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod [%s]", from.getMetadata().getName());
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
}
String task = annotations.get(DruidK8sConstants.TASK);
if (task == null) {
throw new IOE("No task annotation found on pod [%s]", from.getMetadata().getName());
throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return mapper.readValue(Base64Compression.decompressBase64(task), Task.class);
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexing.common.task.Task;
@ -30,6 +29,6 @@ public interface TaskAdapter
Job fromTask(Task task) throws IOException;
Task toTask(Pod from) throws IOException;
Task toTask(Job from) throws IOException;
}

View File

@ -262,11 +262,11 @@ public class KubernetesTaskRunnerTest
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
when(adapter.toTask(eq(job))).thenReturn(task);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
when(peonClient.listPeonPods()).thenReturn(Collections.singletonList(peonPod));
when(peonClient.listAllPeonJobs()).thenReturn(Collections.singletonList(job));
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
@ -325,11 +325,11 @@ public class KubernetesTaskRunnerTest
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
when(adapter.toTask(eq(job))).thenReturn(task);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
when(peonClient.listPeonPods()).thenReturn(Collections.singletonList(peonPod));
when(peonClient.listAllPeonJobs()).thenReturn(Collections.singletonList(job));
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
@ -720,7 +720,7 @@ public class KubernetesTaskRunnerTest
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
when(adapter.toTask(eq(job))).thenReturn(task);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);

View File

@ -23,8 +23,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@ -104,24 +102,6 @@ public class DruidKubernetesPeonClientTest
Assertions.assertEquals(1, currentJobs.size());
}
@Test
void testListPeonPods()
{
Pod pod = new PodBuilder()
.withNewMetadata()
.withName("foo")
.addToLabels(DruidK8sConstants.LABEL_KEY, "true")
.endMetadata()
.withSpec(K8sTestUtils.getDummyPodSpec())
.build();
client.pods().inNamespace("test").create(pod);
DruidKubernetesPeonClient peonClient = new DruidKubernetesPeonClient(new TestKubernetesClient(this.client), "test",
false
);
List<Pod> pods = peonClient.listPeonPods();
Assertions.assertEquals(1, pods.size());
}
@Test
void testCleanup() throws KubernetesResourceNotFoundException
{

View File

@ -161,7 +161,7 @@ public class DruidPeonClientIntegrationTest
thread.start();
// assert that the env variable is corret
Task taskFromEnvVar = adapter.toTask(peonClient.getMainJobPod(new K8sTaskId(task.getId())));
Task taskFromEnvVar = adapter.toTask(job);
assertEquals(task, taskFromEnvVar);
// now copy the task.json file from the pod and make sure its the same as our task.json we expected

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatusBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class JobStatusTest
{
@Test
void testJobsActive()
{
Assertions.assertFalse(JobStatus.isActive(null));
Assertions.assertFalse(JobStatus.isActive(new JobBuilder().build()));
Assertions.assertFalse(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(null).build()).build()));
Assertions.assertFalse(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(0).build()).build()));
Assertions.assertTrue(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(1).build()).build()));
}
@Test
void testJobsSucceeded()
{
Assertions.assertFalse(JobStatus.isSucceeded(null));
Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().build()));
Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(null).build()).build()));
Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(0).build()).build()));
Assertions.assertTrue(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(1).build()).build()));
}
@Test
void testJobsFailed()
{
Assertions.assertFalse(JobStatus.isFailed(null));
Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().build()));
Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(null).build()).build()));
Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(0).build()).build()));
Assertions.assertTrue(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(1).build()).build()));
}
}

View File

@ -30,11 +30,10 @@ import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.commons.lang.StringUtils;
@ -163,22 +162,19 @@ class K8sTaskAdapterTest
task,
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"))
);
// cant launch jobs with test server, we have to hack around this.
Pod pod = K8sTestUtils.createPodFromJob(jobFromSpec);
client.pods().inNamespace("test").create(pod);
PodList podList = client.pods().inNamespace("test").list();
assertEquals(1, podList.getItems().size());
client.batch().v1().jobs().inNamespace("test").create(jobFromSpec);
JobList jobList = client.batch().v1().jobs().inNamespace("test").list();
assertEquals(1, jobList.getItems().size());
// assert that the size of the pod is 1g
Pod myPod = Iterables.getOnlyElement(podList.getItems());
Quantity containerMemory = myPod.getSpec().getContainers().get(0).getResources().getLimits().get("memory");
Job myJob = Iterables.getOnlyElement(jobList.getItems());
Quantity containerMemory = myJob.getSpec().getTemplate().getSpec().getContainers().get(0).getResources().getLimits().get("memory");
String amount = containerMemory.getAmount();
assertEquals(2400000000L, Long.valueOf(amount));
assertTrue(StringUtils.isBlank(containerMemory.getFormat())); // no units specified we talk in bytes
Task taskFromPod = adapter.toTask(Iterables.getOnlyElement(podList.getItems()));
assertEquals(task, taskFromPod);
Task taskFromJob = adapter.toTask(Iterables.getOnlyElement(jobList.getItems()));
assertEquals(task, taskFromJob);
}
@Test

View File

@ -21,10 +21,9 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.NoopTask;
@ -271,9 +270,10 @@ public class PodTemplateTaskAdapterTest
props
);
Pod pod = K8sTestUtils.fileToResource("basePodWithoutAnnotations.yaml", Pod.class);
Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
}
@Test
@ -293,15 +293,18 @@ public class PodTemplateTaskAdapterTest
props
);
Pod basePod = K8sTestUtils.fileToResource("basePodWithoutAnnotations.yaml", Pod.class);
Job baseJob = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
Pod pod = new PodBuilder(basePod)
Job job = new JobBuilder(baseJob)
.editSpec()
.editTemplate()
.editMetadata()
.addToAnnotations(Collections.emptyMap())
.endMetadata()
.endTemplate()
.endSpec()
.build();
Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
}
@Test
@ -321,9 +324,8 @@ public class PodTemplateTaskAdapterTest
props
);
Pod pod = K8sTestUtils.fileToResource("basePod.yaml", Pod.class);
Task actual = adapter.toTask(pod);
Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
Task actual = adapter.toTask(job);
Task expected = NoopTask.create("id", 1);
Assertions.assertEquals(expected, actual);

View File

@ -0,0 +1,27 @@
apiVersion: batch/v1
kind: Job
metadata:
name: "id"
spec:
template:
metadata:
labels:
job-name: "id"
druid.k8s.peons: "true"
annotations:
task: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
name: id-kmwkw
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp/id"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary

View File

@ -0,0 +1,24 @@
apiVersion: batch/v1
kind: Job
metadata:
name: "id"
spec:
template:
metadata:
labels:
job-name: id
name: id-kmwkw
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp/id"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary

View File

@ -1,23 +0,0 @@
apiVersion: v1
kind: Pod
metadata:
name: "id-kmwkw"
labels:
job-name: "id"
druid.k8s.peons: "true"
annotations:
task: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp/id"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary

View File

@ -1,21 +0,0 @@
apiVersion: v1
kind: Pod
metadata:
name: "id-kmwkw"
labels:
job-name: "id"
annotations:
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp/id"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary