Pod template task adapter (#13896)

* Pod template task adapter

* Use getBaseTaskDirPaths

* Remove unused task from getEnv

* Use Optional.ifPresent() instead of Optional.map()

* Pass absolute path

* Don't pass task to getEnv

* Assert the correct adapter is created

* Javadocs and Comments

* Add exception message to assertions
This commit is contained in:
Nicholas Lippis 2023-03-22 16:20:24 -04:00 committed by GitHub
parent 086eb26b74
commit d81d13b9ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1290 additions and 188 deletions

View File

@ -23,7 +23,6 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -35,38 +34,29 @@ import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.k8s.overlord.common.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@ -101,36 +91,28 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final ScheduledExecutorService cleanupExecutor;
protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
private final StartupLoggingConfig startupLoggingConfig;
private final TaskAdapter<Pod, Job> adapter;
protected final TaskAdapter adapter;
private final KubernetesTaskRunnerConfig k8sConfig;
private final TaskQueueConfig taskQueueConfig;
private final TaskLogPusher taskLogPusher;
private final ListeningExecutorService exec;
private final KubernetesPeonClient client;
private final DruidNode node;
private final TaskConfig taskConfig;
public KubernetesTaskRunner(
StartupLoggingConfig startupLoggingConfig,
TaskAdapter<Pod, Job> adapter,
TaskAdapter adapter,
KubernetesTaskRunnerConfig k8sConfig,
TaskQueueConfig taskQueueConfig,
TaskLogPusher taskLogPusher,
KubernetesPeonClient client,
DruidNode node,
TaskConfig taskConfig
KubernetesPeonClient client
)
{
this.startupLoggingConfig = startupLoggingConfig;
this.adapter = adapter;
this.k8sConfig = k8sConfig;
this.taskQueueConfig = taskQueueConfig;
this.taskLogPusher = taskLogPusher;
this.client = client;
this.node = node;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
@ -139,7 +121,6 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
"The task queue bounds how many concurrent k8s tasks you can have"
);
this.taskConfig = taskConfig;
}
@ -163,13 +144,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
JobResponse completedPhase;
Optional<Job> existingJob = client.jobExists(k8sTaskId);
if (!existingJob.isPresent()) {
PeonCommandContext context = new PeonCommandContext(
generateCommand(task),
javaOpts(task),
new File(taskConfig.getBaseTaskDirPaths().get(0)),
node.isEnableTlsPort()
);
Job job = adapter.fromTask(task, context);
Job job = adapter.fromTask(task);
log.info("Job created %s and ready to launch", k8sTaskId);
Pod peonPod = client.launchJobAndWaitForStart(
job,
@ -313,61 +288,6 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
return ImmutableMap.of("taskQueue", (long) taskQueueConfig.getMaxSize());
}
private List<String> javaOpts(Task task)
{
final List<String> javaOpts = new ArrayList<>();
Iterables.addAll(javaOpts, k8sConfig.javaOptsArray);
// Override task specific javaOpts
Object taskJavaOpts = task.getContextValue(
ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
);
if (taskJavaOpts != null) {
Iterables.addAll(
javaOpts,
new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
);
}
javaOpts.add(StringUtils.format("-Ddruid.port=%d", DruidK8sConstants.PORT));
javaOpts.add(StringUtils.format("-Ddruid.plaintextPort=%d", DruidK8sConstants.PORT));
javaOpts.add(StringUtils.format("-Ddruid.tlsPort=%d", node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
javaOpts.add(StringUtils.format(
"-Ddruid.task.executor.tlsPort=%d",
node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
));
javaOpts.add(StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
);
return javaOpts;
}
private List<String> generateCommand(Task task)
{
final List<String> command = new ArrayList<>();
command.add("/peon.sh");
command.add(taskConfig.getBaseTaskDirPaths().get(0));
command.add(task.getId());
command.add("1"); // the attemptId is always 1, we never run the task twice on the same pod.
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add("--nodeType");
command.add(nodeType);
}
// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
);
return command;
}
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{

View File

@ -86,20 +86,31 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
K8sTaskAdapter adapter;
if (kubernetesTaskRunnerConfig.sidecarSupport) {
adapter = new MultiContainerTaskAdapter(client, kubernetesTaskRunnerConfig, smileMapper);
adapter = new MultiContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
);
} else {
adapter = new SingleContainerTaskAdapter(client, kubernetesTaskRunnerConfig, smileMapper);
adapter = new SingleContainerTaskAdapter(
client,
kubernetesTaskRunnerConfig,
taskConfig,
startupLoggingConfig,
druidNode,
smileMapper
);
}
runner = new KubernetesTaskRunner(
startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
druidNode,
taskConfig
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs)
);
return runner;
}

View File

@ -23,7 +23,11 @@ import com.google.common.base.Predicate;
public class DruidK8sConstants
{
public static final String TASK = "task";
public static final String TASK_ID = "task.id";
public static final String TASK_TYPE = "task.type";
public static final String TASK_GROUP_ID = "task.group.id";
public static final String TASK_DATASOURCE = "task.datasource";
public static final int PORT = 8100;
public static final int TLS_PORT = 8091;
public static final String TLS_ENABLED = "tls.enabled";

View File

@ -40,12 +40,20 @@ import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -63,33 +71,48 @@ import java.util.Optional;
* to add some extra coordination to shut down sidecar containers when the main pod exits.
*/
public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
public abstract class K8sTaskAdapter implements TaskAdapter
{
private static final EmittingLogger log = new EmittingLogger(K8sTaskAdapter.class);
protected final KubernetesClientApi client;
protected final KubernetesTaskRunnerConfig config;
protected final KubernetesTaskRunnerConfig taskRunnerConfig;
protected final TaskConfig taskConfig;
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
public K8sTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig config,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
ObjectMapper mapper
)
{
this.client = client;
this.config = config;
this.taskRunnerConfig = taskRunnerConfig;
this.taskConfig = taskConfig;
this.startupLoggingConfig = startupLoggingConfig;
this.node = node;
this.mapper = mapper;
}
@Override
public Job fromTask(Task task, PeonCommandContext context) throws IOException
public Job fromTask(Task task) throws IOException
{
String myPodName = System.getenv("HOSTNAME");
Pod pod = client.executeRequest(client -> client.pods().inNamespace(config.namespace).withName(myPodName).get());
Pod pod = client.executeRequest(client -> client.pods().inNamespace(taskRunnerConfig.namespace).withName(myPodName).get());
PeonCommandContext context = new PeonCommandContext(
generateCommand(task),
javaOpts(task),
new File(taskConfig.getBaseTaskDirPaths().get(0)),
node.isEnableTlsPort()
);
PodSpec podSpec = pod.getSpec();
massageSpec(podSpec, config.primaryContainerName);
massageSpec(podSpec, taskRunnerConfig.primaryContainerName);
return createJobFromPodSpec(podSpec, task, context);
}
@ -125,9 +148,9 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
.endMetadata()
.withNewSpec()
.withTemplate(podTemplate)
.withActiveDeadlineSeconds(config.maxTaskDuration.toStandardDuration().getStandardSeconds())
.withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
.withBackoffLimit(0)
.withTtlSecondsAfterFinished((int) config.taskCleanupDelay.toStandardDuration().getStandardSeconds())
.withTtlSecondsAfterFinished((int) taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
.endSpec()
.build();
}
@ -245,7 +268,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
protected Map<String, String> addJobSpecificAnnotations(PeonCommandContext context, K8sTaskId k8sTaskId)
{
Map<String, String> annotations = config.annotations;
Map<String, String> annotations = taskRunnerConfig.annotations;
annotations.put(DruidK8sConstants.TASK_ID, k8sTaskId.getOriginalTaskId());
annotations.put(DruidK8sConstants.TLS_ENABLED, String.valueOf(context.isEnableTls()));
return annotations;
@ -253,7 +276,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
protected Map<String, String> addJobSpecificLabels()
{
Map<String, String> labels = config.labels;
Map<String, String> labels = taskRunnerConfig.labels;
labels.put(DruidK8sConstants.LABEL_KEY, "true");
return labels;
}
@ -269,7 +292,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
podSpec.setNodeName(null);
podSpec.setRestartPolicy("Never");
podSpec.setHostname(k8sTaskId.getK8sTaskId());
podSpec.setTerminationGracePeriodSeconds(config.graceTerminationPeriodSeconds);
podSpec.setTerminationGracePeriodSeconds(taskRunnerConfig.graceTerminationPeriodSeconds);
PodTemplateSpec podTemplate = new PodTemplateSpec();
ObjectMeta objectMeta = new ObjectMeta();
@ -304,4 +327,57 @@ public abstract class K8sTaskAdapter implements TaskAdapter<Pod, Job>
}
}
private List<String> javaOpts(Task task)
{
final List<String> javaOpts = new ArrayList<>();
Iterables.addAll(javaOpts, taskRunnerConfig.javaOptsArray);
// Override task specific javaOpts
Object taskJavaOpts = task.getContextValue(
ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
);
if (taskJavaOpts != null) {
Iterables.addAll(
javaOpts,
new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
);
}
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.port=%d", DruidK8sConstants.PORT));
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.plaintextPort=%d", DruidK8sConstants.PORT));
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.tlsPort=%d", node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
"-Ddruid.task.executor.tlsPort=%d",
node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
));
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
);
return javaOpts;
}
private List<String> generateCommand(Task task)
{
final List<String> command = new ArrayList<>();
command.add("/peon.sh");
command.add(new File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath());
command.add("1"); // the attemptId is always 1, we never run the task twice on the same pod.
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add("--nodeType");
command.add(nodeType);
}
// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
);
return command;
}
}

View File

@ -34,8 +34,11 @@ import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import java.io.IOException;
import java.util.Collections;
@ -44,13 +47,18 @@ import java.util.Map;
public class MultiContainerTaskAdapter extends K8sTaskAdapter
{
public static String TYPE = "MultiContainer";
public MultiContainerTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig config,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
ObjectMapper mapper
)
{
super(client, config, mapper);
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper);
}
@Override
@ -87,7 +95,7 @@ public class MultiContainerTaskAdapter extends K8sTaskAdapter
{
return new ContainerBuilder()
.withName("kubexit")
.withImage(config.kubexitImage)
.withImage(taskRunnerConfig.kubexitImage)
.withCommand("cp", "/bin/kubexit", "/kubexit/kubexit")
.withVolumeMounts(new VolumeMountBuilder().withMountPath("/kubexit").withName("kubexit").build())
.build();

View File

@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.cfg.MapperConfig;
import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* A PodTemplate {@link TaskAdapter} to transform tasks to kubernetes jobs and kubernetes pods to tasks
*
* Pod Templates
* This TaskAdapter allows the user to provide a pod template per druid task. If a pod template has
* not been provided for a task, then the provided base template will be used.
*
* Providing Pod Templates per Task
* Pod templates are provided as files, each pod template file path must be specified as a runtime property
* druid.indexer.runner.k8s.podTemplate.{task_name}=/path/to/podTemplate.yaml.
*
* Note that the base pod template must be specified as the runtime property
* druid.indexer.runner.k8s.podTemplate.base=/path/to/podTemplate.yaml
*/
public class PodTemplateTaskAdapter implements TaskAdapter
{
public static String TYPE = "PodTemplate";
private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
private final KubernetesClientApi client;
private final KubernetesTaskRunnerConfig taskRunnerConfig;
private final TaskConfig taskConfig;
private final DruidNode node;
private final ObjectMapper mapper;
private final HashMap<String, PodTemplate> templates;
public PodTemplateTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
DruidNode node,
ObjectMapper mapper,
Properties properties
)
{
this.client = client;
this.taskRunnerConfig = taskRunnerConfig;
this.taskConfig = taskConfig;
this.node = node;
this.mapper = mapper;
this.templates = initializePodTemplates(properties);
}
/**
* Create a {@link Job} from a {@link Task}
*
* 1. Select pod template based on task type
* 2. Add labels and annotations to the pod template including the task as a compressed and base64 encoded string
* 3. Add labels and annotations to the job
* 4. Add user specified active deadline seconds and job ttl
* 5. Set backoff limit to zero since druid does not support external systems retrying failed tasks
*
* @param task
* @return {@link Job}
* @throws IOException
*/
@Override
public Job fromTask(Task task) throws IOException
{
PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
if (podTemplate == null) {
throw new ISE("Pod template spec not found for task type [%s]", task.getType());
}
return new JobBuilder()
.withNewMetadata()
.withName(new K8sTaskId(task).getK8sTaskId())
.addToLabels(getJobLabels(taskRunnerConfig))
.addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
.endMetadata()
.withNewSpec()
.withTemplate(podTemplate.getTemplate())
.editTemplate()
.editOrNewMetadata()
.addToAnnotations(getPodTemplateAnnotations(task))
.addToLabels(getPodLabels(taskRunnerConfig))
.endMetadata()
.editSpec()
.editFirstContainer()
.addAllToEnv(getEnv())
.endContainer()
.endSpec()
.endTemplate()
.withActiveDeadlineSeconds(taskRunnerConfig.maxTaskDuration.toStandardDuration().getStandardSeconds())
.withBackoffLimit(0) // druid does not support an external system retrying failed tasks
.withTtlSecondsAfterFinished((int) taskRunnerConfig.taskCleanupDelay.toStandardDuration().getStandardSeconds())
.endSpec()
.build();
}
/**
* Transform a {@link Pod} to a {@link Task}
*
* 1. Find task annotation on the pod
* 2. Base 64 decode and decompress task, read into {@link Task}
*
* @param from
* @return {@link Task}
* @throws IOException
*/
@Override
public Task toTask(Pod from) throws IOException
{
Map<String, String> annotations = from.getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod [%s]", from.getMetadata().getName());
}
String task = annotations.get(DruidK8sConstants.TASK);
if (task == null) {
throw new IOE("No task annotation found on pod [%s]", from.getMetadata().getName());
}
return mapper.readValue(Base64Compression.decompressBase64(task), Task.class);
}
private HashMap<String, PodTemplate> initializePodTemplates(Properties properties)
{
HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
Optional<PodTemplate> basePodTemplate = loadPodTemplate("base", properties);
if (!basePodTemplate.isPresent()) {
throw new IAE("Pod template task adapter requires a base pod template to be specified");
}
podTemplateMap.put("base", basePodTemplate.get());
MapperConfig config = mapper.getDeserializationConfig();
AnnotatedClass cls = AnnotatedClassResolver.resolveWithoutSuperTypes(config, Task.class);
Collection<NamedType> taskSubtypes = mapper.getSubtypeResolver().collectAndResolveSubtypesByClass(config, cls);
for (NamedType namedType : taskSubtypes) {
String taskType = namedType.getName();
Optional<PodTemplate> template = loadPodTemplate(taskType, properties);
template.ifPresent(podTemplate -> podTemplateMap.put(taskType, podTemplate));
}
return podTemplateMap;
}
private Optional<PodTemplate> loadPodTemplate(String key, Properties properties)
{
String property = StringUtils.format(TASK_PROPERTY, key);
String podTemplateFile = properties.getProperty(property);
if (podTemplateFile == null) {
log.debug("Pod template file not specified for [%s]", key);
return Optional.empty();
}
try {
return Optional.of(client.executeRequest(client -> client.v1().podTemplates().load(new File(podTemplateFile)).get()));
}
catch (Exception e) {
throw new ISE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile);
}
}
private Collection<EnvVar> getEnv()
{
return ImmutableList.of(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(new File(taskConfig.getBaseTaskDirPaths().get(0)).getAbsolutePath())
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_JSON_ENV)
.withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector(
null,
StringUtils.format("metadata.annotations['%s']", DruidK8sConstants.TASK)
)).build()).build()
);
}
private Map<String, String> getPodLabels(KubernetesTaskRunnerConfig config)
{
return getJobLabels(config);
}
private Map<String, String> getPodTemplateAnnotations(Task task) throws IOException
{
return ImmutableMap.<String, String>builder()
.put(DruidK8sConstants.TASK, Base64Compression.compressBase64(mapper.writeValueAsString(task)))
.put(DruidK8sConstants.TLS_ENABLED, String.valueOf(node.isEnableTlsPort()))
.put(DruidK8sConstants.TASK_ID, task.getId())
.put(DruidK8sConstants.TASK_TYPE, task.getType())
.put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
.put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
.build();
}
private Map<String, String> getJobLabels(KubernetesTaskRunnerConfig config)
{
return ImmutableMap.<String, String>builder()
.putAll(config.labels)
.put(DruidK8sConstants.LABEL_KEY, "true")
.build();
}
private Map<String, String> getJobAnnotations(KubernetesTaskRunnerConfig config, Task task)
{
return ImmutableMap.<String, String>builder()
.putAll(config.annotations)
.put(DruidK8sConstants.TASK_ID, task.getId())
.put(DruidK8sConstants.TASK_TYPE, task.getType())
.put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
.put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
.build();
}
}

View File

@ -23,8 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import java.io.IOException;
import java.util.Collections;
@ -32,13 +35,18 @@ import java.util.Map;
public class SingleContainerTaskAdapter extends K8sTaskAdapter
{
public static String TYPE = "SingleContainer";
public SingleContainerTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig config,
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
ObjectMapper mapper
)
{
super(client, config, mapper);
super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper);
}
@Override

View File

@ -19,15 +19,17 @@
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexing.common.task.Task;
import java.io.IOException;
public interface TaskAdapter<K, V>
public interface TaskAdapter
{
V fromTask(Task task, PeonCommandContext context) throws IOException;
Job fromTask(Task task) throws IOException;
Task toTask(K from) throws IOException;
Task toTask(Pod from) throws IOException;
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.k8s.overlord.common.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class KubernetesTaskRunnerFactoryTest
{
private ObjectMapper objectMapper;
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private StartupLoggingConfig startupLoggingConfig;
private TaskQueueConfig taskQueueConfig;
private TaskLogPusher taskLogPusher;
private DruidNode druidNode;
private TaskConfig taskConfig;
@Before
public void setup()
{
objectMapper = new TestUtils().getTestObjectMapper();
kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
startupLoggingConfig = new StartupLoggingConfig();
taskQueueConfig = new TaskQueueConfig(
1,
null,
null,
null
);
taskLogPusher = new NoopTaskLogs();
druidNode = new DruidNode(
"test",
"",
false,
0,
1,
true,
false
);
taskConfig = new TaskConfig(
"/tmp",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("/tmp")
);
}
@Test
public void test_get_returnsSameKuberentesTaskRunner_asBuild()
{
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig
);
KubernetesTaskRunner expectedRunner = factory.build();
KubernetesTaskRunner actualRunner = factory.get();
Assert.assertEquals(expectedRunner, actualRunner);
}
@Test
public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSingleContainerTaskAdapter()
{
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig
);
KubernetesTaskRunner runner = factory.build();
Assert.assertNotNull(runner);
Assert.assertTrue(runner.adapter instanceof SingleContainerTaskAdapter);
}
@Test
public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiContainerTaskAdapter()
{
kubernetesTaskRunnerConfig.sidecarSupport = true;
KubernetesTaskRunnerFactory factory = new KubernetesTaskRunnerFactory(
objectMapper,
kubernetesTaskRunnerConfig,
startupLoggingConfig,
taskQueueConfig,
taskLogPusher,
druidNode,
taskConfig
);
KubernetesTaskRunner runner = factory.build();
Assert.assertNotNull(runner);
Assert.assertTrue(runner.adapter instanceof MultiContainerTaskAdapter);
}
}

View File

@ -38,7 +38,6 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@ -53,7 +52,6 @@ import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskAdapter;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
@ -83,16 +81,15 @@ import static org.mockito.Mockito.when;
public class KubernetesTaskRunnerTest
{
private TaskQueueConfig taskQueueConfig;
private StartupLoggingConfig startupLoggingConfig;
private ObjectMapper jsonMapper;
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
private TaskConfig taskConfig;
private TaskLogPusher taskLogPusher;
private DruidNode node;
private DruidNode druidNode;
public KubernetesTaskRunnerTest()
@Before
public void setUp()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@ -103,36 +100,14 @@ public class KubernetesTaskRunnerTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
}
@Before
public void setUp()
{
taskConfig = new TaskConfig(
"src/test/resources",
"src/test/resources",
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
null
);
kubernetesTaskRunnerConfig = new KubernetesTaskRunnerConfig();
kubernetesTaskRunnerConfig.namespace = "test";
kubernetesTaskRunnerConfig.javaOptsArray = Collections.singletonList("-Xmx2g");
taskQueueConfig = new TaskQueueConfig(1, Period.millis(1), Period.millis(1), Period.millis(1));
startupLoggingConfig = new StartupLoggingConfig();
taskLogPusher = mock(TaskLogPusher.class);
node = mock(DruidNode.class);
when(node.isEnableTlsPort()).thenReturn(false);
druidNode = mock(DruidNode.class);
when(druidNode.isEnableTlsPort()).thenReturn(false);
}
@Test
@ -157,7 +132,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task), any())).thenReturn(job);
when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
@ -171,14 +146,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
node,
taskConfig
peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@ -213,7 +185,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task), isA(PeonCommandContext.class))).thenReturn(job);
when(adapter.fromTask(eq(task))).thenReturn(job);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
@ -228,14 +200,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
node,
taskConfig
peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
@ -249,7 +218,7 @@ public class KubernetesTaskRunnerTest
peonPod.getStatus().getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
node.isEnableTlsPort()
druidNode.isEnableTlsPort()
);
verify(spyRunner, times(1)).updateStatus(eq(task), eq(TaskStatus.success(task.getId(), expectedTaskLocation)));
}
@ -279,7 +248,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task), isA(PeonCommandContext.class))).thenReturn(job);
when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
@ -296,14 +265,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
node,
taskConfig
peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems = spyRunner.getKnownTasks();
@ -344,7 +310,7 @@ public class KubernetesTaskRunnerTest
when(peonPod.getStatus()).thenReturn(podStatus);
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
when(adapter.fromTask(eq(task), isA(PeonCommandContext.class))).thenReturn(job);
when(adapter.fromTask(eq(task))).thenReturn(job);
when(adapter.toTask(eq(peonPod))).thenReturn(task);
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
@ -361,14 +327,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
startupLoggingConfig,
adapter,
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
node,
taskConfig
peonClient
);
KubernetesTaskRunner spyRunner = spy(taskRunner);
Collection<? extends TaskRunnerWorkItem> workItems = spyRunner.getKnownTasks();
@ -404,14 +367,11 @@ public class KubernetesTaskRunnerTest
when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod);
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
startupLoggingConfig,
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
peonClient,
node,
taskConfig
peonClient
);
RunnerTaskState state = taskRunner.getRunnerTaskState("foo");
@ -434,14 +394,11 @@ public class KubernetesTaskRunnerTest
Period.millis(1)
);
assertThrows(IllegalArgumentException.class, () -> new KubernetesTaskRunner(
startupLoggingConfig,
mock(K8sTaskAdapter.class),
kubernetesTaskRunnerConfig,
taskQueueConfig,
taskLogPusher,
mock(DruidKubernetesPeonClient.class),
node,
taskConfig
mock(DruidKubernetesPeonClient.class)
));
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@ -29,10 +30,14 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@ -56,11 +61,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
// must have a kind / minikube cluster installed and the image pushed to your repository
public class DruidPeonClientIntegrationTest
{
private final KubernetesClientApi k8sClient;
private final DruidKubernetesPeonClient peonClient;
private final ObjectMapper jsonMapper;
private StartupLoggingConfig startupLoggingConfig;
private TaskConfig taskConfig;
private DruidNode druidNode;
private KubernetesClientApi k8sClient;
private DruidKubernetesPeonClient peonClient;
private ObjectMapper jsonMapper;
public DruidPeonClientIntegrationTest()
@BeforeEach
public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@ -73,6 +82,33 @@ public class DruidPeonClientIntegrationTest
);
k8sClient = new DruidKubernetesClient();
peonClient = new DruidKubernetesPeonClient(k8sClient, "default", false);
druidNode = new DruidNode(
"test",
null,
false,
null,
null,
true,
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
}
@Disabled
@ -84,7 +120,14 @@ public class DruidPeonClientIntegrationTest
Task task = K8sTestUtils.getTask();
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "default";
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(k8sClient, config, jsonMapper);
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
k8sClient,
config,
taskConfig,
startupLoggingConfig,
druidNode,
jsonMapper
);
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new PeonCommandContext(Collections.singletonList(
"sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"

View File

@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
@ -36,11 +37,14 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -58,8 +62,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@EnableKubernetesMockClient(crud = true)
class K8sTaskAdapterTest
{
KubernetesClient client;
private KubernetesClient client;
private final StartupLoggingConfig startupLoggingConfig;
private final TaskConfig taskConfig;
private final DruidNode node;
private ObjectMapper jsonMapper;
public K8sTaskAdapterTest()
@ -73,6 +80,33 @@ class K8sTaskAdapterTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
node = new DruidNode(
"test",
null,
false,
null,
null,
true,
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
}
@Test
@ -83,7 +117,14 @@ class K8sTaskAdapterTest
config.namespace = "test";
config.annotations.put("annotation_key", "annotation_value");
config.labels.put("label_key", "label_value");
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper);
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
@ -105,7 +146,14 @@ class K8sTaskAdapterTest
TestKubernetesClient testClient = new TestKubernetesClient(client);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper);
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),

View File

@ -22,6 +22,7 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
@ -29,11 +30,15 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@ -44,12 +49,14 @@ import java.util.Collections;
@EnableKubernetesMockClient(crud = true)
class MultiContainerTaskAdapterTest
{
KubernetesClient client;
private KubernetesClient client;
private StartupLoggingConfig startupLoggingConfig;
private TaskConfig taskConfig;
private DruidNode druidNode;
private ObjectMapper jsonMapper;
public MultiContainerTaskAdapterTest()
@BeforeEach
public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@ -60,6 +67,33 @@ class MultiContainerTaskAdapterTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
druidNode = new DruidNode(
"test",
null,
false,
null,
null,
true,
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
}
@Test
@ -69,7 +103,14 @@ class MultiContainerTaskAdapterTest
Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml")).get();
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, config, jsonMapper);
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
druidNode,
jsonMapper
);
NoopTask task = NoopTask.create("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
@ -113,7 +154,14 @@ class MultiContainerTaskAdapterTest
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
config.primaryContainerName = "primary";
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient, config, jsonMapper);
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
druidNode,
jsonMapper
);
NoopTask task = NoopTask.create("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, "primary");

View File

@ -0,0 +1,395 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Properties;
@EnableKubernetesMockClient()
public class PodTemplateTaskAdapterTest
{
@TempDir private Path tempDir;
private KubernetesClient client;
private KubernetesTaskRunnerConfig taskRunnerConfig;
private TestKubernetesClient testClient;
private PodTemplate podTemplateSpec;
private TaskConfig taskConfig;
private DruidNode node;
private ObjectMapper mapper;
@BeforeEach
public void setup()
{
taskRunnerConfig = new KubernetesTaskRunnerConfig();
testClient = new TestKubernetesClient(client);
taskConfig = new TaskConfig(
"/tmp",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("/tmp")
);
node = new DruidNode(
"test",
"",
false,
0,
1,
true,
false
);
mapper = new TestUtils().getTestObjectMapper();
podTemplateSpec = client
.v1()
.podTemplates()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePodTemplate.yaml")
)
.get();
}
@Test
public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE()
{
Assert.assertThrows(
"Pod template task adapter requires a base pod template to be specified",
IAE.class,
() -> new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
new Properties()
));
}
@Test
public void test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_raisesISE() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("empty.yaml"));
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
Assert.assertThrows(
"Pod template task adapter requires a base pod template to be specified",
ISE.class,
() -> new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
props
));
}
@Test
public void test_fromTask_withBasePodTemplateInRuntimeProperites() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
props
);
Task task = new NoopTask(
"id",
"id",
"datasource",
0,
0,
null,
null,
null
);
Job actual = adapter.fromTask(task);
Job expected = client
.batch()
.v1()
.jobs()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("expectedNoopJob.yaml")
)
.get();
Assertions.assertEquals(expected, actual);
}
@Test
public void test_fromTask_withBasePodTemplateInRuntimeProperites_andTlsEnabled() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
new DruidNode(
"test",
"",
false,
0,
1,
false,
true
),
mapper,
props
);
Task task = new NoopTask(
"id",
"id",
"datasource",
0,
0,
null,
null,
null
);
Job actual = adapter.fromTask(task);
Job expected = client
.batch()
.v1()
.jobs()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("expectedNoopJobTlsEnabled.yaml")
)
.get();
Assertions.assertEquals(expected, actual);
}
@Test
public void test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_raisesISE() throws IOException
{
Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
Path noopTemplatePath = Files.createFile(tempDir.resolve("noop.yaml"));
mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", baseTemplatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", noopTemplatePath.toString());
Assert.assertThrows(ISE.class, () -> new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
props
));
}
@Test
public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
props
);
Task task = new NoopTask(
"id",
"id",
"datasource",
0,
0,
null,
null,
null
);
Job actual = adapter.fromTask(task);
Job expected = client
.batch()
.v1()
.jobs()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("expectedNoopJob.yaml")
)
.get();
Assertions.assertEquals(expected, actual);
}
@Test
public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
props
);
Pod pod = client
.pods()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePodWithoutAnnotations.yaml")
)
.get();
Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
}
@Test
public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
props
);
Pod basePod = client
.pods()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePodWithoutAnnotations.yaml")
)
.get();
Pod pod = new PodBuilder(basePod)
.editMetadata()
.addToAnnotations(Collections.emptyMap())
.endMetadata()
.build();
Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
}
@Test
public void test_fromTask() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Properties props = new Properties();
props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig,
taskConfig,
node,
mapper,
props
);
Pod pod = client
.pods()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePod.yaml")
)
.get();
Task actual = adapter.toTask(pod);
Task expected = NoopTask.create("id", 1);
Assertions.assertEquals(expected, actual);
}
}

View File

@ -22,17 +22,22 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@ -43,12 +48,14 @@ import java.util.Collections;
@EnableKubernetesMockClient(crud = true)
class SingleContainerTaskAdapterTest
{
KubernetesClient client;
private KubernetesClient client;
private StartupLoggingConfig startupLoggingConfig;
private TaskConfig taskConfig;
private DruidNode druidNode;
private ObjectMapper jsonMapper;
public SingleContainerTaskAdapterTest()
@BeforeEach
public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
@ -59,6 +66,33 @@ class SingleContainerTaskAdapterTest
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
);
druidNode = new DruidNode(
"test",
null,
false,
null,
null,
true,
false
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfig(
"src/test/resources",
null,
null,
null,
null,
false,
null,
null,
null,
false,
false,
null,
null,
false,
ImmutableList.of("src/test/resources")
);
}
@Test
@ -70,7 +104,14 @@ class SingleContainerTaskAdapterTest
.get();
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(testClient, config, jsonMapper);
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
druidNode,
jsonMapper
);
NoopTask task = NoopTask.create("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),

View File

@ -0,0 +1,23 @@
apiVersion: v1
kind: Pod
metadata:
name: "id-kmwkw"
labels:
job-name: "id"
druid.k8s.peons: "true"
annotations:
task: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp/id"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary

View File

@ -0,0 +1,12 @@
apiVersion: v1
kind: PodTemplate
metadata:
name: test
template:
spec:
containers:
- command:
- sleep
- "3600"
image: one
name: primary

View File

@ -0,0 +1,21 @@
apiVersion: v1
kind: Pod
metadata:
name: "id-kmwkw"
labels:
job-name: "id"
annotations:
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp/id"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary

View File

@ -0,0 +1,40 @@
apiVersion: batch/v1
kind: Job
metadata:
name: "id"
labels:
druid.k8s.peons: "true"
annotations:
task.id: "id"
task.type: "noop"
task.group.id: "id"
task.datasource: "datasource"
spec:
activeDeadlineSeconds: 14400
backoffLimit: 0
ttlSecondsAfterFinished: 172800
template:
metadata:
labels:
druid.k8s.peons: "true"
annotations:
task: "H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
tls.enabled: "false"
task.id: "id"
task.type: "noop"
task.group.id: "id"
task.datasource: "datasource"
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary

View File

@ -0,0 +1,40 @@
apiVersion: batch/v1
kind: Job
metadata:
name: "id"
labels:
druid.k8s.peons: "true"
annotations:
task.id: "id"
task.type: "noop"
task.group.id: "id"
task.datasource: "datasource"
spec:
activeDeadlineSeconds: 14400
backoffLimit: 0
ttlSecondsAfterFinished: 172800
template:
metadata:
labels:
druid.k8s.peons: "true"
annotations:
task: "H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
tls.enabled: "true"
task.id: "id"
task.type: "noop"
task.group.id: "id"
task.datasource: "datasource"
spec:
containers:
- command:
- sleep
- "3600"
env:
- name: "TASK_DIR"
value: "/tmp"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
image: one
name: primary