mirror of https://github.com/apache/druid.git
Ephemeral storage is respected from the overlod for peon tasks (#14201)
This commit is contained in:
parent
4c15e978f1
commit
123c4908c8
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.fabric8.kubernetes.api.model.Container;
|
||||
|
@ -37,6 +36,7 @@ import io.fabric8.kubernetes.api.model.Pod;
|
|||
import io.fabric8.kubernetes.api.model.PodSpec;
|
||||
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
|
||||
import io.fabric8.kubernetes.api.model.Quantity;
|
||||
import io.fabric8.kubernetes.api.model.ResourceRequirements;
|
||||
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
|
||||
import io.fabric8.kubernetes.api.model.batch.v1.Job;
|
||||
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
|
||||
|
@ -55,6 +55,7 @@ import org.apache.druid.server.log.StartupLoggingConfig;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -104,7 +105,10 @@ public abstract class K8sTaskAdapter implements TaskAdapter
|
|||
public Job fromTask(Task task) throws IOException
|
||||
{
|
||||
String myPodName = System.getenv("HOSTNAME");
|
||||
Pod pod = client.executeRequest(client -> client.pods().inNamespace(taskRunnerConfig.namespace).withName(myPodName).get());
|
||||
Pod pod = client.executeRequest(client -> client.pods()
|
||||
.inNamespace(taskRunnerConfig.namespace)
|
||||
.withName(myPodName)
|
||||
.get());
|
||||
PeonCommandContext context = new PeonCommandContext(
|
||||
generateCommand(task),
|
||||
javaOpts(task),
|
||||
|
@ -267,13 +271,11 @@ public abstract class K8sTaskAdapter implements TaskAdapter
|
|||
mainContainer.setArgs(Collections.singletonList(Joiner.on(" ").join(context.getComamnd())));
|
||||
|
||||
mainContainer.setName("main");
|
||||
ImmutableMap<String, Quantity> resources = ImmutableMap.of(
|
||||
"cpu",
|
||||
new Quantity("1000", "m"),
|
||||
"memory",
|
||||
new Quantity(String.valueOf(containerSize))
|
||||
ResourceRequirements requirements = getResourceRequirements(
|
||||
mainContainer.getResources(),
|
||||
containerSize
|
||||
);
|
||||
mainContainer.setResources(new ResourceRequirementsBuilder().withRequests(resources).withLimits(resources).build());
|
||||
mainContainer.setResources(requirements);
|
||||
return mainContainer;
|
||||
}
|
||||
|
||||
|
@ -355,13 +357,24 @@ public abstract class K8sTaskAdapter implements TaskAdapter
|
|||
}
|
||||
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.port=%d", DruidK8sConstants.PORT));
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.plaintextPort=%d", DruidK8sConstants.PORT));
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.tlsPort=%d", node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1));
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
|
||||
"-Ddruid.plaintextPort=%d",
|
||||
DruidK8sConstants.PORT
|
||||
));
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
|
||||
"-Ddruid.tlsPort=%d",
|
||||
node.isEnableTlsPort()
|
||||
? DruidK8sConstants.TLS_PORT
|
||||
: -1
|
||||
));
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
|
||||
"-Ddruid.task.executor.tlsPort=%d",
|
||||
node.isEnableTlsPort() ? DruidK8sConstants.TLS_PORT : -1
|
||||
));
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
|
||||
javaOpts.add(org.apache.druid.java.util.common.StringUtils.format(
|
||||
"-Ddruid.task.executor.enableTlsPort=%s",
|
||||
node.isEnableTlsPort()
|
||||
)
|
||||
);
|
||||
return javaOpts;
|
||||
}
|
||||
|
@ -391,4 +404,29 @@ public abstract class K8sTaskAdapter implements TaskAdapter
|
|||
);
|
||||
return command;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize)
|
||||
{
|
||||
Map<String, Quantity> resourceMap = new HashMap<>();
|
||||
resourceMap.put("cpu", new Quantity("1000", "m"));
|
||||
resourceMap.put("memory", new Quantity(String.valueOf(containerSize)));
|
||||
ResourceRequirementsBuilder result = new ResourceRequirementsBuilder();
|
||||
if (requirements != null) {
|
||||
if (requirements.getRequests() == null || requirements.getRequests().isEmpty()) {
|
||||
requirements.setRequests(resourceMap);
|
||||
} else {
|
||||
requirements.getRequests().putAll(resourceMap);
|
||||
}
|
||||
if (requirements.getLimits() == null || requirements.getLimits().isEmpty()) {
|
||||
requirements.setLimits(resourceMap);
|
||||
} else {
|
||||
requirements.getLimits().putAll(resourceMap);
|
||||
}
|
||||
} else {
|
||||
requirements = result.withRequests(resourceMap).withLimits(resourceMap).build();
|
||||
}
|
||||
return requirements;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.google.api.client.util.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.fabric8.kubernetes.api.model.Container;
|
||||
|
@ -32,6 +33,8 @@ import io.fabric8.kubernetes.api.model.EnvVarBuilder;
|
|||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import io.fabric8.kubernetes.api.model.PodSpec;
|
||||
import io.fabric8.kubernetes.api.model.Quantity;
|
||||
import io.fabric8.kubernetes.api.model.ResourceRequirements;
|
||||
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
|
||||
import io.fabric8.kubernetes.api.model.batch.v1.Job;
|
||||
import io.fabric8.kubernetes.api.model.batch.v1.JobList;
|
||||
import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
|
@ -42,6 +45,7 @@ import org.apache.druid.indexing.common.TestUtils;
|
|||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
|
||||
import org.apache.druid.indexing.common.task.IndexTask;
|
||||
import org.apache.druid.indexing.common.task.NoopTask;
|
||||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
|
||||
import org.apache.druid.java.util.common.HumanReadableBytes;
|
||||
|
@ -322,4 +326,91 @@ class K8sTaskAdapterTest
|
|||
.get();
|
||||
assertEquals(jsonMapper.writeValueAsString(config.peonMonitors), env.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEphemeralStorageIsRespected() throws IOException
|
||||
{
|
||||
TestKubernetesClient testClient = new TestKubernetesClient(client);
|
||||
Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);
|
||||
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
|
||||
config.namespace = "test";
|
||||
SingleContainerTaskAdapter adapter =
|
||||
new SingleContainerTaskAdapter(testClient,
|
||||
config, taskConfig,
|
||||
startupLoggingConfig,
|
||||
node,
|
||||
jsonMapper
|
||||
);
|
||||
NoopTask task = NoopTask.create("id", 1);
|
||||
Job actual = adapter.createJobFromPodSpec(
|
||||
pod.getSpec(),
|
||||
task,
|
||||
new PeonCommandContext(
|
||||
Collections.singletonList("foo && bar"),
|
||||
new ArrayList<>(),
|
||||
new File("/tmp")
|
||||
)
|
||||
);
|
||||
Job expected = K8sTestUtils.fileToResource("expectedEphemeralOutput.yaml", Job.class);
|
||||
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
|
||||
// this would never happen in real life, but for the jdk 17 tests this is a problem
|
||||
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
|
||||
actual.getSpec()
|
||||
.getTemplate()
|
||||
.getSpec()
|
||||
.getContainers()
|
||||
.get(0)
|
||||
.getEnv()
|
||||
.removeIf(x -> x.getName().equals("TASK_JSON"));
|
||||
expected.getSpec()
|
||||
.getTemplate()
|
||||
.getSpec()
|
||||
.getContainers()
|
||||
.get(0)
|
||||
.getEnv()
|
||||
.removeIf(x -> x.getName().equals("TASK_JSON"));
|
||||
Assertions.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEphemeralStorage()
|
||||
{
|
||||
// no resources set.
|
||||
Container container = new ContainerBuilder().build();
|
||||
ResourceRequirements result = K8sTaskAdapter.getResourceRequirements(
|
||||
container.getResources(),
|
||||
100
|
||||
);
|
||||
// requests and limits will only have 2 items, cpu / memory
|
||||
assertEquals(2, result.getLimits().size());
|
||||
assertEquals(2, result.getRequests().size());
|
||||
|
||||
// test with ephemeral storage
|
||||
ImmutableMap<String, Quantity> requestMap = ImmutableMap.of("ephemeral-storage", new Quantity("1Gi"));
|
||||
ImmutableMap<String, Quantity> limitMap = ImmutableMap.of("ephemeral-storage", new Quantity("10Gi"));
|
||||
container.setResources(new ResourceRequirementsBuilder().withRequests(requestMap).withLimits(limitMap).build());
|
||||
ResourceRequirements ephemeralResult = K8sTaskAdapter.getResourceRequirements(
|
||||
container.getResources(),
|
||||
100
|
||||
);
|
||||
// you will have ephemeral storage as well.
|
||||
assertEquals(3, ephemeralResult.getLimits().size());
|
||||
assertEquals(3, ephemeralResult.getRequests().size());
|
||||
// cpu and memory should be fixed
|
||||
assertEquals(result.getRequests().get("cpu"), ephemeralResult.getRequests().get("cpu"));
|
||||
assertEquals(result.getRequests().get("memory"), ephemeralResult.getRequests().get("memory"));
|
||||
assertEquals("1Gi", ephemeralResult.getRequests().get("ephemeral-storage").toString());
|
||||
|
||||
assertEquals(result.getLimits().get("cpu"), ephemeralResult.getLimits().get("cpu"));
|
||||
assertEquals(result.getLimits().get("memory"), ephemeralResult.getLimits().get("memory"));
|
||||
assertEquals("10Gi", ephemeralResult.getLimits().get("ephemeral-storage").toString());
|
||||
|
||||
// we should also preserve additional properties
|
||||
container.getResources().setAdditionalProperty("additional", "some-value");
|
||||
ResourceRequirements additionalProperties = K8sTaskAdapter.getResourceRequirements(
|
||||
container.getResources(),
|
||||
100
|
||||
);
|
||||
assertEquals(1, additionalProperties.getAdditionalProperties().size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: test
|
||||
spec:
|
||||
containers:
|
||||
- command:
|
||||
- sleep
|
||||
- "3600"
|
||||
image: one
|
||||
name: primary
|
||||
resources:
|
||||
limits:
|
||||
cpu: "1m"
|
||||
memory: "1"
|
||||
ephemeral-storage: "10Gi"
|
||||
requests:
|
||||
cpu: "1m"
|
||||
memory: "1"
|
||||
ephemeral-storage: "1Gi"
|
||||
env:
|
||||
- name: "druid_monitoring_monitors"
|
||||
value: '["org.apache.druid.java.util.metrics.JvmMonitor", "org.apache.druid.server.metrics.TaskCountStatsMonitor"]'
|
||||
- command:
|
||||
- "tail -f /dev/null"
|
||||
image: two
|
||||
name: sidecar
|
|
@ -0,0 +1,65 @@
|
|||
apiVersion: "batch/v1"
|
||||
kind: "Job"
|
||||
metadata:
|
||||
annotations:
|
||||
task.id: "id"
|
||||
tls.enabled: "false"
|
||||
labels:
|
||||
druid.k8s.peons: "true"
|
||||
name: "id"
|
||||
spec:
|
||||
activeDeadlineSeconds: 14400
|
||||
backoffLimit: 0
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
task.id: "id"
|
||||
tls.enabled: "false"
|
||||
labels:
|
||||
druid.k8s.peons: "true"
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- "foo && bar"
|
||||
command:
|
||||
- "sh"
|
||||
- "-c"
|
||||
env:
|
||||
- name: "druid_monitoring_monitors"
|
||||
value: "[\"org.apache.druid.java.util.metrics.JvmMonitor\", \"org.apache.druid.server.metrics.TaskCountStatsMonitor\"\
|
||||
]"
|
||||
- name: "TASK_DIR"
|
||||
value: "/tmp"
|
||||
- name: "TASK_JSON"
|
||||
value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
|
||||
- name: "JAVA_OPTS"
|
||||
value: ""
|
||||
- name: "druid_host"
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: "status.podIP"
|
||||
- name: "HOSTNAME"
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: "metadata.name"
|
||||
image: "one"
|
||||
name: "main"
|
||||
ports:
|
||||
- containerPort: 8091
|
||||
name: "druid-tls-port"
|
||||
protocol: "TCP"
|
||||
- containerPort: 8100
|
||||
name: "druid-port"
|
||||
protocol: "TCP"
|
||||
resources:
|
||||
limits:
|
||||
memory: "2400000000"
|
||||
cpu: "1000m"
|
||||
ephemeral-storage: 10Gi
|
||||
requests:
|
||||
memory: "2400000000"
|
||||
cpu: "1000m"
|
||||
ephemeral-storage: 1Gi
|
||||
hostname: "id"
|
||||
restartPolicy: "Never"
|
||||
ttlSecondsAfterFinished: 172800
|
Loading…
Reference in New Issue