Fix pod template reading logic (#15915)

* Fix pod template reading

* PR changes

* Fix unit tests
This commit is contained in:
George Shiqi Wu 2024-02-20 11:13:51 -05:00 committed by GitHub
parent 9eaaeb5c16
commit 2c0d1128f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 88 additions and 30 deletions

View File

@ -20,10 +20,6 @@
package org.apache.druid.k8s.overlord.taskadapter; package org.apache.druid.k8s.overlord.taskadapter;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.cfg.MapperConfig;
import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.EnvVar;
@ -60,10 +56,12 @@ import java.nio.charset.Charset;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
/** /**
* A PodTemplate {@link TaskAdapter} to transform tasks to kubernetes jobs and kubernetes pods to tasks * A PodTemplate {@link TaskAdapter} to transform tasks to kubernetes jobs and kubernetes pods to tasks
@ -84,7 +82,9 @@ public class PodTemplateTaskAdapter implements TaskAdapter
public static final String TYPE = "customTemplateAdapter"; public static final String TYPE = "customTemplateAdapter";
private static final Logger log = new Logger(PodTemplateTaskAdapter.class); private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.";
private final KubernetesTaskRunnerConfig taskRunnerConfig; private final KubernetesTaskRunnerConfig taskRunnerConfig;
private final TaskConfig taskConfig; private final TaskConfig taskConfig;
@ -212,37 +212,46 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private HashMap<String, PodTemplate> initializePodTemplates(Properties properties) private HashMap<String, PodTemplate> initializePodTemplates(Properties properties)
{ {
HashMap<String, PodTemplate> podTemplateMap = new HashMap<>(); Set<String> taskAdapterTemplateKeys = getTaskAdapterTemplates(properties);
Optional<PodTemplate> basePodTemplate = loadPodTemplate("base", properties); if (!taskAdapterTemplateKeys.contains("base")) {
if (!basePodTemplate.isPresent()) { throw new IAE("Pod template task adapter requires a base pod template to be specified under druid.indexer.runner.k8s.podTemplate.base");
throw new IAE("Pod template task adapter requires a base pod template to be specified");
} }
podTemplateMap.put("base", basePodTemplate.get());
MapperConfig config = mapper.getDeserializationConfig(); HashMap<String, PodTemplate> podTemplateMap = new HashMap<>();
AnnotatedClass cls = AnnotatedClassResolver.resolveWithoutSuperTypes(config, Task.class); for (String taskAdapterTemplateKey : taskAdapterTemplateKeys) {
Collection<NamedType> taskSubtypes = mapper.getSubtypeResolver().collectAndResolveSubtypesByClass(config, cls); Optional<PodTemplate> template = loadPodTemplate(taskAdapterTemplateKey, properties);
for (NamedType namedType : taskSubtypes) { template.ifPresent(podTemplate -> podTemplateMap.put(taskAdapterTemplateKey, podTemplate));
String taskType = namedType.getName();
Optional<PodTemplate> template = loadPodTemplate(taskType, properties);
template.ifPresent(podTemplate -> podTemplateMap.put(taskType, podTemplate));
} }
return podTemplateMap; return podTemplateMap;
} }
private static Set<String> getTaskAdapterTemplates(Properties properties)
{
Set<String> taskAdapterTemplates = new HashSet<>();
for (String runtimeProperty : properties.stringPropertyNames()) {
if (runtimeProperty.startsWith(TASK_PROPERTY)) {
String[] taskAdapterPropertyPaths = runtimeProperty.split("\\.");
taskAdapterTemplates.add(taskAdapterPropertyPaths[taskAdapterPropertyPaths.length - 1]);
}
}
return taskAdapterTemplates;
}
private Optional<PodTemplate> loadPodTemplate(String key, Properties properties) private Optional<PodTemplate> loadPodTemplate(String key, Properties properties)
{ {
String property = StringUtils.format(TASK_PROPERTY, key); String property = TASK_PROPERTY + key;
String podTemplateFile = properties.getProperty(property); String podTemplateFile = properties.getProperty(property);
if (podTemplateFile == null) { if (podTemplateFile == null) {
log.debug("Pod template file not specified for [%s]", key); throw new IAE("Pod template file not specified for [%s]", property);
return Optional.empty();
} }
try { try {
return Optional.of(Serialization.unmarshal(Files.newInputStream(new File(podTemplateFile).toPath()), PodTemplate.class)); return Optional.of(Serialization.unmarshal(Files.newInputStream(new File(podTemplateFile).toPath()), PodTemplate.class));
} }
catch (Exception e) { catch (Exception e) {
throw new ISE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile); throw new IAE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile);
} }
} }

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.PodTemplateBuilder;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
@ -33,7 +35,6 @@ import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression; import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
@ -93,8 +94,8 @@ public class PodTemplateTaskAdapterTest
@Test @Test
public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE() public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE()
{ {
Assert.assertThrows( Exception exception = Assert.assertThrows(
"Pod template task adapter requires a base pod template to be specified", "No base prop should throw an IAE",
IAE.class, IAE.class,
() -> new PodTemplateTaskAdapter( () -> new PodTemplateTaskAdapter(
taskRunnerConfig, taskRunnerConfig,
@ -104,19 +105,20 @@ public class PodTemplateTaskAdapterTest
new Properties(), new Properties(),
taskLogs taskLogs
)); ));
Assert.assertEquals(exception.getMessage(), "Pod template task adapter requires a base pod template to be specified under druid.indexer.runner.k8s.podTemplate.base");
} }
@Test @Test
public void test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_raisesISE() throws IOException public void test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_raisesIAE() throws IOException
{ {
Path templatePath = Files.createFile(tempDir.resolve("empty.yaml")); Path templatePath = Files.createFile(tempDir.resolve("empty.yaml"));
Properties props = new Properties(); Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
Assert.assertThrows( Exception exception = Assert.assertThrows(
"Pod template task adapter requires a base pod template to be specified", "Empty base pod template should throw a exception",
ISE.class, IAE.class,
() -> new PodTemplateTaskAdapter( () -> new PodTemplateTaskAdapter(
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
@ -125,6 +127,9 @@ public class PodTemplateTaskAdapterTest
props, props,
taskLogs taskLogs
)); ));
Assert.assertTrue(exception.getMessage().contains("Failed to load pod template file for"));
} }
@Test @Test
@ -186,7 +191,7 @@ public class PodTemplateTaskAdapterTest
} }
@Test @Test
public void test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_raisesISE() throws IOException public void test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_raisesIAE() throws IOException
{ {
Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml")); Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
Path noopTemplatePath = Files.createFile(tempDir.resolve("noop.yaml")); Path noopTemplatePath = Files.createFile(tempDir.resolve("noop.yaml"));
@ -196,7 +201,7 @@ public class PodTemplateTaskAdapterTest
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", baseTemplatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.base", baseTemplatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", noopTemplatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", noopTemplatePath.toString());
Assert.assertThrows(ISE.class, () -> new PodTemplateTaskAdapter( Assert.assertThrows(IAE.class, () -> new PodTemplateTaskAdapter(
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -520,7 +525,51 @@ public class PodTemplateTaskAdapterTest
.collect(Collectors.toList()).get(0).getValue()); .collect(Collectors.toList()).get(0).getValue());
} }
@Test
public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() throws IOException
{
Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
Path kafkaTemplatePath = Files.createFile(tempDir.resolve("kafka.yaml"));
PodTemplate kafkaPodTemplate = new PodTemplateBuilder(podTemplateSpec)
.editTemplate()
.editSpec()
.setNewVolumeLike(0, new VolumeBuilder().withName("volume").build())
.endVolume()
.endSpec()
.endTemplate()
.build();
mapper.writeValue(kafkaTemplatePath.toFile(), kafkaPodTemplate);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", baseTemplatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.index_kafka", kafkaTemplatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
taskRunnerConfig,
taskConfig,
node,
mapper,
props,
taskLogs
);
Task kafkaTask = new NoopTask("id", "id", "datasource", 0, 0, null) {
@Override
public String getType()
{
return "index_kafka";
}
};
Task noopTask = new NoopTask("id", "id", "datasource", 0, 0, null);
Job actual = adapter.fromTask(kafkaTask);
Assert.assertEquals(1, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
actual = adapter.fromTask(noopTask);
Assert.assertEquals(0, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
}
private void assertJobSpecsEqual(Job actual, Job expected) throws IOException private void assertJobSpecsEqual(Job actual, Job expected) throws IOException
{ {