diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index f3f43832e1b..178ec91e29b 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -56,6 +56,7 @@ Additional Configuration |--------|---------------|-----------|-------|--------| |`druid.indexer.runner.debugJobs`|`boolean`|Clean up K8s jobs after tasks complete.|False|No| |`druid.indexer.runner.sidecarSupport`|`boolean`|If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod.|False|No| +|`druid.indexer.runner.primaryContainerName`|`String`|If running with sidecars, the `primaryContainerName` should be that of your druid container like `druid-overlord`.|First container in `podSpec` list|No| |`druid.indexer.runner.kubexitImage`|`String`|Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate.|karlkfi/kubexit:latest|No| |`druid.indexer.runner.disableClientProxy`|`boolean`|Use this if you have a global http(s) proxy and you wish to bypass it.|false|No| |`druid.indexer.runner.maxTaskDuration`|`Duration`|Max time a task is allowed to run for before getting killed|`PT4H`|No| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index d71a0f1970a..82d57d60987 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -41,6 +41,13 @@ public class KubernetesTaskRunnerConfig @JsonProperty public boolean sidecarSupport = false; + @JsonProperty + // if this is not set, then the first container in your pod spec is assumed to be the overlord container. + // usually this is fine, but when you are dynamically adding sidecars like istio, the service mesh could + // in fact place the istio-proxy container as the first container. Thus you would specify this value to + // the name of your primary container. eg) druid-overlord + public String primaryContainerName = null; + @JsonProperty // for multi-container jobs, we need this image to shut down sidecars after the main container // has completed diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java index e6fe2eaa84d..2aebc37aba1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java @@ -167,6 +167,7 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient .jobs() .inNamespace(namespace) .withName(taskId.getK8sTaskId()) + .inContainer("main") .getLogReader(); if (reader == null) { return Optional.absent(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java index fcae777ceba..b0672113996 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapter.java @@ -88,13 +88,17 @@ public abstract class K8sTaskAdapter implements TaskAdapter { String myPodName = System.getenv("HOSTNAME"); Pod pod = client.executeRequest(client -> client.pods().inNamespace(config.namespace).withName(myPodName).get()); - return createJobFromPodSpec(pod.getSpec(), task, context); + PodSpec podSpec = pod.getSpec(); + massageSpec(podSpec, config.primaryContainerName); + return createJobFromPodSpec(podSpec, task, context); } @Override public Task toTask(Pod from) throws IOException { - List envVars = from.getSpec().getContainers().get(0).getEnv(); + PodSpec podSpec = from.getSpec(); + massageSpec(podSpec, "main"); + List envVars = podSpec.getContainers().get(0).getEnv(); Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst(); String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null); if (contents == null) { @@ -104,12 +108,13 @@ public abstract class K8sTaskAdapter implements TaskAdapter } @VisibleForTesting - public abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException; + abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException; protected Job buildJob( K8sTaskId k8sTaskId, Map labels, - Map annotations, PodTemplateSpec podTemplate + Map annotations, + PodTemplateSpec podTemplate ) { return new JobBuilder() @@ -275,4 +280,28 @@ public abstract class K8sTaskAdapter implements TaskAdapter return podTemplate; } + @VisibleForTesting + static void massageSpec(PodSpec spec, String primaryContainerName) + { + // find the primary container and make it first, + if (StringUtils.isNotBlank(primaryContainerName)) { + int i = 0; + while (i < spec.getContainers().size()) { + if (primaryContainerName.equals(spec.getContainers().get(i).getName())) { + break; + } + i++; + } + // if the primaryContainer is not found, assume the primary container is the first container. + if (i >= spec.getContainers().size()) { + throw new IllegalArgumentException("Could not find container named: " + + primaryContainerName + + " in PodSpec"); + } + Container primary = spec.getContainers().get(i); + spec.getContainers().remove(i); + spec.getContainers().add(0, primary); + } + } + } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java index b3f4fb104da..df890ede6d1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapter.java @@ -54,7 +54,7 @@ public class MultiContainerTaskAdapter extends K8sTaskAdapter } @Override - public Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException + Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException { K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java index a86fed2fc2e..c6e9d049f6c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SingleContainerTaskAdapter.java @@ -42,7 +42,7 @@ public class SingleContainerTaskAdapter extends K8sTaskAdapter } @Override - public Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException + Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException { K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java index fede19b490f..59ef5b430c1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskAdapterTest.java @@ -24,8 +24,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.KubernetesClient; @@ -38,6 +41,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.File; @@ -160,4 +164,45 @@ class K8sTaskAdapterTest expected = (long) ((HumanReadableBytes.parse("512m") + HumanReadableBytes.parse("1g")) * 1.2); assertEquals(expected, K8sTaskAdapter.getContainerMemory(context)); } + + @Test + void testMassagingSpec() + { + PodSpec spec = new PodSpec(); + List containers = new ArrayList<>(); + containers.add(new ContainerBuilder() + .withName("secondary").build()); + containers.add(new ContainerBuilder() + .withName("sidecar").build()); + containers.add(new ContainerBuilder() + .withName("primary").build()); + spec.setContainers(containers); + K8sTaskAdapter.massageSpec(spec, "primary"); + + List actual = spec.getContainers(); + Assertions.assertEquals(3, containers.size()); + Assertions.assertEquals("primary", actual.get(0).getName()); + Assertions.assertEquals("secondary", actual.get(1).getName()); + Assertions.assertEquals("sidecar", actual.get(2).getName()); + } + + @Test + void testNoPrimaryFound() + { + PodSpec spec = new PodSpec(); + List containers = new ArrayList<>(); + containers.add(new ContainerBuilder() + .withName("istio-proxy").build()); + containers.add(new ContainerBuilder() + .withName("main").build()); + containers.add(new ContainerBuilder() + .withName("sidecar").build()); + spec.setContainers(containers); + + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + K8sTaskAdapter.massageSpec(spec, "primary"); + }); + } + } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java index afce9061f61..f1c7b390de5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/MultiContainerTaskAdapterTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; @@ -104,4 +105,50 @@ class MultiContainerTaskAdapterTest Assertions.assertEquals(expected, actual); } + @Test + public void testMultiContainerSupportWithNamedContainer() throws IOException + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpecOrder.yaml")).get(); + KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); + config.namespace = "test"; + config.primaryContainerName = "primary"; + MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, config, jsonMapper); + NoopTask task = NoopTask.create("id", 1); + PodSpec spec = pod.getSpec(); + K8sTaskAdapter.massageSpec(spec, "primary"); + Job actual = adapter.createJobFromPodSpec( + spec, + task, + new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), + new ArrayList<>(), + new File("/tmp") + ) + ); + Job expected = client.batch() + .v1() + .jobs() + .load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutputOrder.yaml")) + .get(); + + // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, + // this would never happen in real life, but for the jdk 17 tests this is a problem + // could be related to: https://bugs.openjdk.org/browse/JDK-8081450 + actual.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getEnv() + .removeIf(x -> x.getName().equals("TASK_JSON")); + expected.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getEnv() + .removeIf(x -> x.getName().equals("TASK_JSON")); + Assertions.assertEquals(expected, actual); + } + } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml new file mode 100644 index 00000000000..480f3afeb91 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml @@ -0,0 +1,108 @@ +apiVersion: "batch/v1" +kind: "Job" +metadata: + annotations: + task.id: "id" + tls.enabled: "false" + labels: + druid.k8s.peons: "true" + name: "id" +spec: + activeDeadlineSeconds: 14400 + backoffLimit: 0 + template: + metadata: + annotations: + task.id: "id" + tls.enabled: "false" + labels: + druid.k8s.peons: "true" + spec: + hostname: "id" + containers: + - args: + - "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1\"" + command: + - "/bin/sh" + - "-c" + env: + - name: "TASK_DIR" + value: "/tmp" + - name: "TASK_JSON" + value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA=" + - name: "JAVA_OPTS" + value: "" + - name: "druid_host" + valueFrom: + fieldRef: + fieldPath: "status.podIP" + - name: "HOSTNAME" + valueFrom: + fieldRef: + fieldPath: "metadata.name" + - name: "KUBEXIT_NAME" + value: "main" + - name: "KUBEXIT_GRAVEYARD" + value: "/graveyard" + + image: "one" + name: "main" + ports: + - containerPort: 8091 + name: "druid-tls-port" + protocol: "TCP" + - containerPort: 8100 + name: "druid-port" + protocol: "TCP" + resources: + limits: + cpu: "1000m" + memory: "2400000000" + requests: + cpu: "1000m" + memory: "2400000000" + volumeMounts: + - mountPath: "/graveyard" + name: "graveyard" + - mountPath: "/kubexit" + name: "kubexit" + - args: + - "/kubexit/kubexit /bin/sh -c \"/bin/sidekick -loggingEnabled=true -platform=platform\ + \ -splunkCluster=cluster -splunkIndexName=druid -splunkSourceType=json -splunkWorkingDir=/opt/splunkforwarder\ + \ -dataCenter=dc -environment=env -application=druid -instance=instance\ + \ -logFiles=/logs/druid/*.log\" || true" + command: + - "/bin/sh" + - "-c" + env: + - name: "KUBEXIT_NAME" + value: "sidecar" + - name: "KUBEXIT_GRAVEYARD" + value: "/graveyard" + - name: "KUBEXIT_DEATH_DEPS" + value: "main" + image: "two" + name: "sidecar" + volumeMounts: + - mountPath: "/graveyard" + name: "graveyard" + - mountPath: "/kubexit" + name: "kubexit" + initContainers: + - command: + - "cp" + - "/bin/kubexit" + - "/kubexit/kubexit" + image: "karlkfi/kubexit:v0.3.2" + name: "kubexit" + volumeMounts: + - mountPath: "/kubexit" + name: "kubexit" + restartPolicy: "Never" + volumes: + - emptyDir: + medium: "Memory" + name: "graveyard" + - emptyDir: {} + name: "kubexit" + ttlSecondsAfterFinished: 172800 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/multiContainerPodSpecOrder.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/multiContainerPodSpecOrder.yaml new file mode 100644 index 00000000000..31ff81ccacb --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/multiContainerPodSpecOrder.yaml @@ -0,0 +1,26 @@ +apiVersion: v1 +kind: Pod +metadata: + name: test +spec: + containers: + - image: two + name: sidecar + args: + - -loggingEnabled=true + - -platform=platform + - -splunkCluster=cluster + - -splunkIndexName=druid + - -splunkSourceType=json + - -splunkWorkingDir=/opt/splunkforwarder + - -dataCenter=dc + - -environment=env + - -application=druid + - -instance=instance + - -logFiles=/logs/druid/*.log + command: + - /bin/sidekick + - image: one + name: primary + command: + - "tail -f /dev/null"