mirror of https://github.com/apache/druid.git
Better sidecar support (#13655)
* Better sidecar support * remove un-thrown exception from test * Druid you are such a stickler about spelling :) * Only require the primaryContainerName, no need to exclude containers
This commit is contained in:
parent
fa4cab405f
commit
c1f283fd31
|
@ -56,6 +56,7 @@ Additional Configuration
|
|||
|--------|---------------|-----------|-------|--------|
|
||||
|`druid.indexer.runner.debugJobs`|`boolean`|Clean up K8s jobs after tasks complete.|False|No|
|
||||
|`druid.indexer.runner.sidecarSupport`|`boolean`|If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod.|False|No|
|
||||
|`druid.indexer.runner.primaryContainerName`|`String`|If running with sidecars, the `primaryContainerName` should be that of your druid container like `druid-overlord`.|First container in `podSpec` list|No|
|
||||
|`druid.indexer.runner.kubexitImage`|`String`|Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate.|karlkfi/kubexit:latest|No|
|
||||
|`druid.indexer.runner.disableClientProxy`|`boolean`|Use this if you have a global http(s) proxy and you wish to bypass it.|false|No|
|
||||
|`druid.indexer.runner.maxTaskDuration`|`Duration`|Max time a task is allowed to run for before getting killed|`PT4H`|No|
|
||||
|
|
|
@ -41,6 +41,13 @@ public class KubernetesTaskRunnerConfig
|
|||
@JsonProperty
|
||||
public boolean sidecarSupport = false;
|
||||
|
||||
@JsonProperty
|
||||
// if this is not set, then the first container in your pod spec is assumed to be the overlord container.
|
||||
// usually this is fine, but when you are dynamically adding sidecars like istio, the service mesh could
|
||||
// in fact place the istio-proxy container as the first container. Thus you would specify this value to
|
||||
// the name of your primary container. eg) druid-overlord
|
||||
public String primaryContainerName = null;
|
||||
|
||||
@JsonProperty
|
||||
// for multi-container jobs, we need this image to shut down sidecars after the main container
|
||||
// has completed
|
||||
|
|
|
@ -167,6 +167,7 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
|
|||
.jobs()
|
||||
.inNamespace(namespace)
|
||||
.withName(taskId.getK8sTaskId())
|
||||
.inContainer("main")
|
||||
.getLogReader();
|
||||
if (reader == null) {
|
||||
return Optional.absent();
|
||||
|
|
|
@ -88,13 +88,17 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
|
|||
{
|
||||
String myPodName = System.getenv("HOSTNAME");
|
||||
Pod pod = client.executeRequest(client -> client.pods().inNamespace(config.namespace).withName(myPodName).get());
|
||||
return createJobFromPodSpec(pod.getSpec(), task, context);
|
||||
PodSpec podSpec = pod.getSpec();
|
||||
massageSpec(podSpec, config.primaryContainerName);
|
||||
return createJobFromPodSpec(podSpec, task, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task toTask(Pod from) throws IOException
|
||||
{
|
||||
List<EnvVar> envVars = from.getSpec().getContainers().get(0).getEnv();
|
||||
PodSpec podSpec = from.getSpec();
|
||||
massageSpec(podSpec, "main");
|
||||
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
|
||||
Optional<EnvVar> taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();
|
||||
String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null);
|
||||
if (contents == null) {
|
||||
|
@ -104,12 +108,13 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException;
|
||||
abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException;
|
||||
|
||||
protected Job buildJob(
|
||||
K8sTaskId k8sTaskId,
|
||||
Map<String, String> labels,
|
||||
Map<String, String> annotations, PodTemplateSpec podTemplate
|
||||
Map<String, String> annotations,
|
||||
PodTemplateSpec podTemplate
|
||||
)
|
||||
{
|
||||
return new JobBuilder()
|
||||
|
@ -275,4 +280,28 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
|
|||
return podTemplate;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void massageSpec(PodSpec spec, String primaryContainerName)
|
||||
{
|
||||
// find the primary container and make it first,
|
||||
if (StringUtils.isNotBlank(primaryContainerName)) {
|
||||
int i = 0;
|
||||
while (i < spec.getContainers().size()) {
|
||||
if (primaryContainerName.equals(spec.getContainers().get(i).getName())) {
|
||||
break;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
// if the primaryContainer is not found, assume the primary container is the first container.
|
||||
if (i >= spec.getContainers().size()) {
|
||||
throw new IllegalArgumentException("Could not find container named: "
|
||||
+ primaryContainerName
|
||||
+ " in PodSpec");
|
||||
}
|
||||
Container primary = spec.getContainers().get(i);
|
||||
spec.getContainers().remove(i);
|
||||
spec.getContainers().add(0, primary);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ public class MultiContainerTaskAdapter extends K8sTaskAdapter
|
|||
}
|
||||
|
||||
@Override
|
||||
public Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException
|
||||
Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException
|
||||
{
|
||||
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ public class SingleContainerTaskAdapter extends K8sTaskAdapter
|
|||
}
|
||||
|
||||
@Override
|
||||
public Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException
|
||||
Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException
|
||||
{
|
||||
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
|
||||
|
||||
|
|
|
@ -24,8 +24,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.fabric8.kubernetes.api.model.Container;
|
||||
import io.fabric8.kubernetes.api.model.ContainerBuilder;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import io.fabric8.kubernetes.api.model.PodList;
|
||||
import io.fabric8.kubernetes.api.model.PodSpec;
|
||||
import io.fabric8.kubernetes.api.model.Quantity;
|
||||
import io.fabric8.kubernetes.api.model.batch.v1.Job;
|
||||
import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
|
@ -38,6 +41,7 @@ import org.apache.druid.indexing.common.task.Task;
|
|||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
|
||||
import org.apache.druid.java.util.common.HumanReadableBytes;
|
||||
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -160,4 +164,45 @@ class K8sTaskAdapterTest
|
|||
expected = (long) ((HumanReadableBytes.parse("512m") + HumanReadableBytes.parse("1g")) * 1.2);
|
||||
assertEquals(expected, K8sTaskAdapter.getContainerMemory(context));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMassagingSpec()
|
||||
{
|
||||
PodSpec spec = new PodSpec();
|
||||
List<Container> containers = new ArrayList<>();
|
||||
containers.add(new ContainerBuilder()
|
||||
.withName("secondary").build());
|
||||
containers.add(new ContainerBuilder()
|
||||
.withName("sidecar").build());
|
||||
containers.add(new ContainerBuilder()
|
||||
.withName("primary").build());
|
||||
spec.setContainers(containers);
|
||||
K8sTaskAdapter.massageSpec(spec, "primary");
|
||||
|
||||
List<Container> actual = spec.getContainers();
|
||||
Assertions.assertEquals(3, containers.size());
|
||||
Assertions.assertEquals("primary", actual.get(0).getName());
|
||||
Assertions.assertEquals("secondary", actual.get(1).getName());
|
||||
Assertions.assertEquals("sidecar", actual.get(2).getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testNoPrimaryFound()
|
||||
{
|
||||
PodSpec spec = new PodSpec();
|
||||
List<Container> containers = new ArrayList<>();
|
||||
containers.add(new ContainerBuilder()
|
||||
.withName("istio-proxy").build());
|
||||
containers.add(new ContainerBuilder()
|
||||
.withName("main").build());
|
||||
containers.add(new ContainerBuilder()
|
||||
.withName("sidecar").build());
|
||||
spec.setContainers(containers);
|
||||
|
||||
|
||||
Assertions.assertThrows(IllegalArgumentException.class, () -> {
|
||||
K8sTaskAdapter.massageSpec(spec, "primary");
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import io.fabric8.kubernetes.api.model.PodSpec;
|
||||
import io.fabric8.kubernetes.api.model.batch.v1.Job;
|
||||
import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
|
||||
|
@ -104,4 +105,50 @@ class MultiContainerTaskAdapterTest
|
|||
Assertions.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiContainerSupportWithNamedContainer() throws IOException
|
||||
{
|
||||
TestKubernetesClient testClient = new TestKubernetesClient(client);
|
||||
Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpecOrder.yaml")).get();
|
||||
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
|
||||
config.namespace = "test";
|
||||
config.primaryContainerName = "primary";
|
||||
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, config, jsonMapper);
|
||||
NoopTask task = NoopTask.create("id", 1);
|
||||
PodSpec spec = pod.getSpec();
|
||||
K8sTaskAdapter.massageSpec(spec, "primary");
|
||||
Job actual = adapter.createJobFromPodSpec(
|
||||
spec,
|
||||
task,
|
||||
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
|
||||
new ArrayList<>(),
|
||||
new File("/tmp")
|
||||
)
|
||||
);
|
||||
Job expected = client.batch()
|
||||
.v1()
|
||||
.jobs()
|
||||
.load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutputOrder.yaml"))
|
||||
.get();
|
||||
|
||||
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
|
||||
// this would never happen in real life, but for the jdk 17 tests this is a problem
|
||||
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
|
||||
actual.getSpec()
|
||||
.getTemplate()
|
||||
.getSpec()
|
||||
.getContainers()
|
||||
.get(0)
|
||||
.getEnv()
|
||||
.removeIf(x -> x.getName().equals("TASK_JSON"));
|
||||
expected.getSpec()
|
||||
.getTemplate()
|
||||
.getSpec()
|
||||
.getContainers()
|
||||
.get(0)
|
||||
.getEnv()
|
||||
.removeIf(x -> x.getName().equals("TASK_JSON"));
|
||||
Assertions.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
apiVersion: "batch/v1"
|
||||
kind: "Job"
|
||||
metadata:
|
||||
annotations:
|
||||
task.id: "id"
|
||||
tls.enabled: "false"
|
||||
labels:
|
||||
druid.k8s.peons: "true"
|
||||
name: "id"
|
||||
spec:
|
||||
activeDeadlineSeconds: 14400
|
||||
backoffLimit: 0
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
task.id: "id"
|
||||
tls.enabled: "false"
|
||||
labels:
|
||||
druid.k8s.peons: "true"
|
||||
spec:
|
||||
hostname: "id"
|
||||
containers:
|
||||
- args:
|
||||
- "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1\""
|
||||
command:
|
||||
- "/bin/sh"
|
||||
- "-c"
|
||||
env:
|
||||
- name: "TASK_DIR"
|
||||
value: "/tmp"
|
||||
- name: "TASK_JSON"
|
||||
value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
|
||||
- name: "JAVA_OPTS"
|
||||
value: ""
|
||||
- name: "druid_host"
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: "status.podIP"
|
||||
- name: "HOSTNAME"
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: "metadata.name"
|
||||
- name: "KUBEXIT_NAME"
|
||||
value: "main"
|
||||
- name: "KUBEXIT_GRAVEYARD"
|
||||
value: "/graveyard"
|
||||
|
||||
image: "one"
|
||||
name: "main"
|
||||
ports:
|
||||
- containerPort: 8091
|
||||
name: "druid-tls-port"
|
||||
protocol: "TCP"
|
||||
- containerPort: 8100
|
||||
name: "druid-port"
|
||||
protocol: "TCP"
|
||||
resources:
|
||||
limits:
|
||||
cpu: "1000m"
|
||||
memory: "2400000000"
|
||||
requests:
|
||||
cpu: "1000m"
|
||||
memory: "2400000000"
|
||||
volumeMounts:
|
||||
- mountPath: "/graveyard"
|
||||
name: "graveyard"
|
||||
- mountPath: "/kubexit"
|
||||
name: "kubexit"
|
||||
- args:
|
||||
- "/kubexit/kubexit /bin/sh -c \"/bin/sidekick -loggingEnabled=true -platform=platform\
|
||||
\ -splunkCluster=cluster -splunkIndexName=druid -splunkSourceType=json -splunkWorkingDir=/opt/splunkforwarder\
|
||||
\ -dataCenter=dc -environment=env -application=druid -instance=instance\
|
||||
\ -logFiles=/logs/druid/*.log\" || true"
|
||||
command:
|
||||
- "/bin/sh"
|
||||
- "-c"
|
||||
env:
|
||||
- name: "KUBEXIT_NAME"
|
||||
value: "sidecar"
|
||||
- name: "KUBEXIT_GRAVEYARD"
|
||||
value: "/graveyard"
|
||||
- name: "KUBEXIT_DEATH_DEPS"
|
||||
value: "main"
|
||||
image: "two"
|
||||
name: "sidecar"
|
||||
volumeMounts:
|
||||
- mountPath: "/graveyard"
|
||||
name: "graveyard"
|
||||
- mountPath: "/kubexit"
|
||||
name: "kubexit"
|
||||
initContainers:
|
||||
- command:
|
||||
- "cp"
|
||||
- "/bin/kubexit"
|
||||
- "/kubexit/kubexit"
|
||||
image: "karlkfi/kubexit:v0.3.2"
|
||||
name: "kubexit"
|
||||
volumeMounts:
|
||||
- mountPath: "/kubexit"
|
||||
name: "kubexit"
|
||||
restartPolicy: "Never"
|
||||
volumes:
|
||||
- emptyDir:
|
||||
medium: "Memory"
|
||||
name: "graveyard"
|
||||
- emptyDir: {}
|
||||
name: "kubexit"
|
||||
ttlSecondsAfterFinished: 172800
|
|
@ -0,0 +1,26 @@
|
|||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: test
|
||||
spec:
|
||||
containers:
|
||||
- image: two
|
||||
name: sidecar
|
||||
args:
|
||||
- -loggingEnabled=true
|
||||
- -platform=platform
|
||||
- -splunkCluster=cluster
|
||||
- -splunkIndexName=druid
|
||||
- -splunkSourceType=json
|
||||
- -splunkWorkingDir=/opt/splunkforwarder
|
||||
- -dataCenter=dc
|
||||
- -environment=env
|
||||
- -application=druid
|
||||
- -instance=instance
|
||||
- -logFiles=/logs/druid/*.log
|
||||
command:
|
||||
- /bin/sidekick
|
||||
- image: one
|
||||
name: primary
|
||||
command:
|
||||
- "tail -f /dev/null"
|
Loading…
Reference in New Issue