diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 00f9c2dfd9c..2569b40b2ad 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -52,6 +51,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.JobResponse; +import org.apache.druid.k8s.overlord.common.JobStatus; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException; @@ -80,6 +80,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Runs tasks as k8s jobs using the "internal peon" verb. @@ -339,7 +340,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner public Collection getKnownTasks() { List result = new ArrayList<>(); - for (Pod existingTask : client.listPeonPods()) { + for (Job existingTask : client.listAllPeonJobs()) { try { Task task = adapter.toTask(existingTask); ListenableFuture future = run(task); @@ -426,7 +427,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner public Collection getRunningTasks() { List result = new ArrayList<>(); - for (Pod existingTask : client.listPeonPods(Sets.newHashSet(PeonPhase.RUNNING))) { + for (Job existingTask : client.listAllPeonJobs().stream().filter(JobStatus::isActive).collect(Collectors.toSet())) { try { Task task = adapter.toTask(existingTask); ListenableFuture future = run(task); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index 9f21b83bc29..336c9910158 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -33,10 +33,8 @@ import java.io.InputStream; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; public class DruidKubernetesPeonClient implements KubernetesPeonClient { @@ -181,23 +179,6 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient .getItems()); } - @Override - public List listPeonPods(Set phases) - { - return listPeonPods().stream() - .filter(x -> phases.contains(PeonPhase.getPhaseFor(x))) - .collect(Collectors.toList()); - } - - @Override - public List listPeonPods() - { - return clientApi.executeRequest(client -> client.pods().inNamespace(namespace) - .withLabel(DruidK8sConstants.LABEL_KEY) - .list().getItems()); - - } - @Override public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit) { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobStatus.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobStatus.java new file mode 100644 index 00000000000..d6222246823 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobStatus.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.batch.v1.Job; + +public class JobStatus +{ + + public static boolean isActive(Job job) + { + if (job == null || job.getStatus() == null || job.getStatus().getActive() == null) { + return false; + } + return job.getStatus().getActive() > 0; + } + + public static boolean isSucceeded(Job job) + { + if (job == null || job.getStatus() == null || job.getStatus().getSucceeded() == null) { + return false; + } + return job.getStatus().getSucceeded() > 0; + } + + public static boolean isFailed(Job job) + { + if (job == null || job.getStatus() == null || job.getStatus().getFailed() == null) { + return false; + } + return job.getStatus().getFailed() > 0; + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java index 4bbecab7dd0..f327468feaa 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java @@ -118,10 +118,9 @@ public abstract class K8sTaskAdapter implements TaskAdapter } @Override - public Task toTask(Pod from) throws IOException + public Task toTask(Job from) throws IOException { - // all i have to do here is grab the main container...done - PodSpec podSpec = from.getSpec(); + PodSpec podSpec = from.getSpec().getTemplate().getSpec(); massageSpec(podSpec, "main"); List envVars = podSpec.getContainers().get(0).getEnv(); Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index f04e1517b05..f9e402f94e2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -25,7 +25,6 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import java.io.InputStream; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -48,10 +47,6 @@ public interface KubernetesPeonClient List listAllPeonJobs(); - List listPeonPods(Set phases); - - List listPeonPods(); - int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit); Pod getMainJobPod(K8sTaskId taskId); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java index 9765b0b8350..4952c4f09f5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapter.java @@ -155,15 +155,15 @@ public class PodTemplateTaskAdapter implements TaskAdapter * @throws IOException */ @Override - public Task toTask(Pod from) throws IOException + public Task toTask(Job from) throws IOException { - Map annotations = from.getMetadata().getAnnotations(); + Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { - throw new IOE("No annotations found on pod [%s]", from.getMetadata().getName()); + throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); } String task = annotations.get(DruidK8sConstants.TASK); if (task == null) { - throw new IOE("No task annotation found on pod [%s]", from.getMetadata().getName()); + throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName()); } return mapper.readValue(Base64Compression.decompressBase64(task), Task.class); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java index 00263fdc3b9..a58240e40ae 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/TaskAdapter.java @@ -19,7 +19,6 @@ package org.apache.druid.k8s.overlord.common; -import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.batch.v1.Job; import org.apache.druid.indexing.common.task.Task; @@ -30,6 +29,6 @@ public interface TaskAdapter Job fromTask(Task task) throws IOException; - Task toTask(Pod from) throws IOException; + Task toTask(Job from) throws IOException; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 256619812e3..6c651f0cbde 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -262,11 +262,11 @@ public class KubernetesTaskRunnerTest K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); when(adapter.fromTask(eq(task))).thenReturn(job); - when(adapter.toTask(eq(peonPod))).thenReturn(task); + when(adapter.toTask(eq(job))).thenReturn(task); DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); - when(peonClient.listPeonPods()).thenReturn(Collections.singletonList(peonPod)); + when(peonClient.listAllPeonJobs()).thenReturn(Collections.singletonList(job)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); @@ -325,11 +325,11 @@ public class KubernetesTaskRunnerTest K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); when(adapter.fromTask(eq(task))).thenReturn(job); - when(adapter.toTask(eq(peonPod))).thenReturn(task); + when(adapter.toTask(eq(job))).thenReturn(task); DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); - when(peonClient.listPeonPods()).thenReturn(Collections.singletonList(peonPod)); + when(peonClient.listAllPeonJobs()).thenReturn(Collections.singletonList(job)); when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); @@ -720,7 +720,7 @@ public class KubernetesTaskRunnerTest K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); when(adapter.fromTask(eq(task))).thenReturn(job); - when(adapter.toTask(eq(peonPod))).thenReturn(task); + when(adapter.toTask(eq(job))).thenReturn(task); DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java index b864c730dff..ebf7d8c0a44 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java @@ -23,8 +23,6 @@ import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.api.model.PodTemplateSpec; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -104,24 +102,6 @@ public class DruidKubernetesPeonClientTest Assertions.assertEquals(1, currentJobs.size()); } - @Test - void testListPeonPods() - { - Pod pod = new PodBuilder() - .withNewMetadata() - .withName("foo") - .addToLabels(DruidK8sConstants.LABEL_KEY, "true") - .endMetadata() - .withSpec(K8sTestUtils.getDummyPodSpec()) - .build(); - client.pods().inNamespace("test").create(pod); - DruidKubernetesPeonClient peonClient = new DruidKubernetesPeonClient(new TestKubernetesClient(this.client), "test", - false - ); - List pods = peonClient.listPeonPods(); - Assertions.assertEquals(1, pods.size()); - } - @Test void testCleanup() throws KubernetesResourceNotFoundException { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java index f82a82d78b4..471c201740a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidPeonClientIntegrationTest.java @@ -161,7 +161,7 @@ public class DruidPeonClientIntegrationTest thread.start(); // assert that the env variable is corret - Task taskFromEnvVar = adapter.toTask(peonClient.getMainJobPod(new K8sTaskId(task.getId()))); + Task taskFromEnvVar = adapter.toTask(job); assertEquals(task, taskFromEnvVar); // now copy the task.json file from the pod and make sure its the same as our task.json we expected diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobStatusTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobStatusTest.java new file mode 100644 index 00000000000..e380f275956 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobStatusTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import io.fabric8.kubernetes.api.model.batch.v1.JobStatusBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class JobStatusTest +{ + @Test + void testJobsActive() + { + Assertions.assertFalse(JobStatus.isActive(null)); + Assertions.assertFalse(JobStatus.isActive(new JobBuilder().build())); + Assertions.assertFalse(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(null).build()).build())); + Assertions.assertFalse(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(0).build()).build())); + Assertions.assertTrue(JobStatus.isActive(new JobBuilder().withStatus(new JobStatusBuilder().withActive(1).build()).build())); + } + + @Test + void testJobsSucceeded() + { + Assertions.assertFalse(JobStatus.isSucceeded(null)); + Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().build())); + Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(null).build()).build())); + Assertions.assertFalse(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(0).build()).build())); + Assertions.assertTrue(JobStatus.isSucceeded(new JobBuilder().withStatus(new JobStatusBuilder().withSucceeded(1).build()).build())); + } + + @Test + void testJobsFailed() + { + Assertions.assertFalse(JobStatus.isFailed(null)); + Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().build())); + Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(null).build()).build())); + Assertions.assertFalse(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(0).build()).build())); + Assertions.assertTrue(JobStatus.isFailed(new JobBuilder().withStatus(new JobStatusBuilder().withFailed(1).build()).build())); + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java index cc6c6fec436..2c59a5df220 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java @@ -30,11 +30,10 @@ import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.EnvVarBuilder; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobList; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import org.apache.commons.lang.StringUtils; @@ -163,22 +162,19 @@ class K8sTaskAdapterTest task, new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/")) ); - - // cant launch jobs with test server, we have to hack around this. - Pod pod = K8sTestUtils.createPodFromJob(jobFromSpec); - client.pods().inNamespace("test").create(pod); - PodList podList = client.pods().inNamespace("test").list(); - assertEquals(1, podList.getItems().size()); + client.batch().v1().jobs().inNamespace("test").create(jobFromSpec); + JobList jobList = client.batch().v1().jobs().inNamespace("test").list(); + assertEquals(1, jobList.getItems().size()); // assert that the size of the pod is 1g - Pod myPod = Iterables.getOnlyElement(podList.getItems()); - Quantity containerMemory = myPod.getSpec().getContainers().get(0).getResources().getLimits().get("memory"); + Job myJob = Iterables.getOnlyElement(jobList.getItems()); + Quantity containerMemory = myJob.getSpec().getTemplate().getSpec().getContainers().get(0).getResources().getLimits().get("memory"); String amount = containerMemory.getAmount(); assertEquals(2400000000L, Long.valueOf(amount)); assertTrue(StringUtils.isBlank(containerMemory.getFormat())); // no units specified we talk in bytes - Task taskFromPod = adapter.toTask(Iterables.getOnlyElement(podList.getItems())); - assertEquals(task, taskFromPod); + Task taskFromJob = adapter.toTask(Iterables.getOnlyElement(jobList.getItems())); + assertEquals(task, taskFromJob); } @Test diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java index 15c911d850e..f800db16eb3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PodTemplateTaskAdapterTest.java @@ -21,10 +21,9 @@ package org.apache.druid.k8s.overlord.common; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.NoopTask; @@ -271,9 +270,10 @@ public class PodTemplateTaskAdapterTest props ); - Pod pod = K8sTestUtils.fileToResource("basePodWithoutAnnotations.yaml", Pod.class); + Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class); - Assert.assertThrows(IOE.class, () -> adapter.toTask(pod)); + + Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); } @Test @@ -293,15 +293,18 @@ public class PodTemplateTaskAdapterTest props ); - Pod basePod = K8sTestUtils.fileToResource("basePodWithoutAnnotations.yaml", Pod.class); + Job baseJob = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class); - Pod pod = new PodBuilder(basePod) + Job job = new JobBuilder(baseJob) + .editSpec() + .editTemplate() .editMetadata() .addToAnnotations(Collections.emptyMap()) .endMetadata() + .endTemplate() + .endSpec() .build(); - - Assert.assertThrows(IOE.class, () -> adapter.toTask(pod)); + Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); } @Test @@ -321,9 +324,8 @@ public class PodTemplateTaskAdapterTest props ); - Pod pod = K8sTestUtils.fileToResource("basePod.yaml", Pod.class); - - Task actual = adapter.toTask(pod); + Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class); + Task actual = adapter.toTask(job); Task expected = NoopTask.create("id", 1); Assertions.assertEquals(expected, actual); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/baseJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/baseJob.yaml new file mode 100644 index 00000000000..eb5f46cb3e2 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/baseJob.yaml @@ -0,0 +1,27 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: "id" +spec: + template: + metadata: + labels: + job-name: "id" + druid.k8s.peons: "true" + annotations: + task: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA=" + name: id-kmwkw + spec: + containers: + - command: + - sleep + - "3600" + env: + - name: "TASK_DIR" + value: "/tmp/id" + - name: "TASK_JSON" + valueFrom: + fieldRef: + fieldPath: "metadata.annotations['task']" + image: one + name: primary diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/baseJobWithoutAnnotations.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/baseJobWithoutAnnotations.yaml new file mode 100644 index 00000000000..9b1ad233fce --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/baseJobWithoutAnnotations.yaml @@ -0,0 +1,24 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: "id" +spec: + template: + metadata: + labels: + job-name: id + name: id-kmwkw + spec: + containers: + - command: + - sleep + - "3600" + env: + - name: "TASK_DIR" + value: "/tmp/id" + - name: "TASK_JSON" + valueFrom: + fieldRef: + fieldPath: "metadata.annotations['task']" + image: one + name: primary \ No newline at end of file diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml deleted file mode 100644 index 5c8c4f7855e..00000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePod.yaml +++ /dev/null @@ -1,23 +0,0 @@ -apiVersion: v1 -kind: Pod -metadata: - name: "id-kmwkw" - labels: - job-name: "id" - druid.k8s.peons: "true" - annotations: - task: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA=" -spec: - containers: - - command: - - sleep - - "3600" - env: - - name: "TASK_DIR" - value: "/tmp/id" - - name: "TASK_JSON" - valueFrom: - fieldRef: - fieldPath: "metadata.annotations['task']" - image: one - name: primary \ No newline at end of file diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml deleted file mode 100644 index c56ba7c9b6a..00000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/basePodWithoutAnnotations.yaml +++ /dev/null @@ -1,21 +0,0 @@ -apiVersion: v1 -kind: Pod -metadata: - name: "id-kmwkw" - labels: - job-name: "id" - annotations: -spec: - containers: - - command: - - sleep - - "3600" - env: - - name: "TASK_DIR" - value: "/tmp/id" - - name: "TASK_JSON" - valueFrom: - fieldRef: - fieldPath: "metadata.annotations['task']" - image: one - name: primary \ No newline at end of file