Support CPU resource configurable for Kubernates job under MoK Mode (#16008)

* support CPU resource configurable for Kubernates job

* update property doc

* fix test name

* refine doc format
This commit is contained in:
Sensor 2024-03-04 23:12:09 +08:00 committed by GitHub
parent ec52f686c0
commit 4e9b758661
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 214 additions and 28 deletions

View File

@ -236,6 +236,7 @@ data:
|`druid.indexer.runner.peonMonitors`| `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. |`[]`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. |`PT30S` (K8s default)|No|
|`druid.indexer.runner.capacity`| `Integer` | Number of concurrent jobs that can be sent to Kubernetes. |`2147483647`|No|
|`druid.indexer.runner.cpuCoreInMicro`| `Integer` | Number of CPU micro core for the task. | `1000`|No|
### Metrics added

View File

@ -101,6 +101,10 @@ public class KubernetesTaskRunnerConfig
@NotNull
private List<String> javaOptsArray = ImmutableList.of();
@JsonProperty
@NotNull
private int cpuCoreInMicro = 0;
@JsonProperty
@NotNull
private Map<String, String> labels = ImmutableMap.of();
@ -133,6 +137,7 @@ public class KubernetesTaskRunnerConfig
Period k8sjobLaunchTimeout,
List<String> peonMonitors,
List<String> javaOptsArray,
int cpuCoreInMicro,
Map<String, String> labels,
Map<String, String> annotations,
Integer capacity
@ -184,6 +189,10 @@ public class KubernetesTaskRunnerConfig
javaOptsArray,
this.javaOptsArray
);
this.cpuCoreInMicro = ObjectUtils.defaultIfNull(
cpuCoreInMicro,
this.cpuCoreInMicro
);
this.labels = ObjectUtils.defaultIfNull(
labels,
this.labels
@ -264,6 +273,11 @@ public class KubernetesTaskRunnerConfig
return javaOptsArray;
}
public int getCpuCoreInMicro()
{
return cpuCoreInMicro;
}
public Map<String, String> getLabels()
{
return labels;
@ -299,6 +313,7 @@ public class KubernetesTaskRunnerConfig
private Period k8sjobLaunchTimeout;
private List<String> peonMonitors;
private List<String> javaOptsArray;
private int cpuCoreInMicro;
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
@ -379,6 +394,12 @@ public class KubernetesTaskRunnerConfig
return this;
}
public Builder withCpuCore(int cpuCore)
{
this.cpuCoreInMicro = cpuCore;
return this;
}
public Builder withJavaOptsArray(List<String> javaOptsArray)
{
this.javaOptsArray = javaOptsArray;
@ -397,6 +418,7 @@ public class KubernetesTaskRunnerConfig
return this;
}
public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity)
{
this.capacity = capacity;
@ -419,6 +441,7 @@ public class KubernetesTaskRunnerConfig
this.k8sjobLaunchTimeout,
this.peonMonitors,
this.javaOptsArray,
this.cpuCoreInMicro,
this.labels,
this.annotations,
this.capacity

View File

@ -30,6 +30,8 @@ public class DruidK8sConstants
public static final String TASK_DATASOURCE = "task.datasource";
public static final int PORT = 8100;
public static final int TLS_PORT = 8091;
public static final int DEFAULT_CPU_MILLICORES = 1000;
public static final String DEFAULT_JAVA_HEAP_SIZE = "1G";
public static final String TLS_ENABLED = "tls.enabled";
public static final String TASK_JSON_ENV = "TASK_JSON";
public static final String TASK_DIR_ENV = "TASK_DIR";

View File

@ -31,17 +31,25 @@ public class PeonCommandContext
private final List<String> javaOpts;
private final File taskDir;
private final boolean enableTls;
private final int CpuMicroCore;
public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir)
public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, int CpuMicroCore)
{
this(comamnd, javaOpts, taskDir, false);
this(comamnd, javaOpts, taskDir, CpuMicroCore, false);
}
public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, boolean enableTls)
public PeonCommandContext(
List<String> comamnd,
List<String> javaOpts,
File taskDir,
int CpuMicroCore,
boolean enableTls
)
{
this.comamnd = comamnd;
this.javaOpts = javaOpts;
this.taskDir = taskDir;
this.CpuMicroCore = CpuMicroCore;
this.enableTls = enableTls;
}
@ -66,6 +74,11 @@ public class PeonCommandContext
return taskDir;
}
public int getCpuMicroCore()
{
return CpuMicroCore;
}
public boolean isEnableTls()
{
return enableTls;

View File

@ -125,6 +125,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
generateCommand(task),
javaOpts(task),
taskConfig.getBaseTaskDir(),
taskRunnerConfig.getCpuCoreInMicro(),
node.isEnableTlsPort()
);
PodSpec podSpec = pod.getSpec();
@ -216,7 +217,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
{
List<String> javaOpts = context.getJavaOpts();
Optional<Long> optionalXmx = getJavaOptValueBytes("-Xmx", javaOpts);
long heapSize = HumanReadableBytes.parse("1g");
long heapSize = HumanReadableBytes.parse(DruidK8sConstants.DEFAULT_JAVA_HEAP_SIZE);
if (optionalXmx.isPresent()) {
heapSize = optionalXmx.get();
}
@ -319,7 +320,8 @@ public abstract class K8sTaskAdapter implements TaskAdapter
mainContainer.setName("main");
ResourceRequirements requirements = getResourceRequirements(
mainContainer.getResources(),
containerSize
containerSize,
context.getCpuMicroCore()
);
mainContainer.setResources(requirements);
return mainContainer;
@ -457,10 +459,13 @@ public abstract class K8sTaskAdapter implements TaskAdapter
}
@VisibleForTesting
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize)
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize, int cpuMicroCore)
{
Map<String, Quantity> resourceMap = new HashMap<>();
resourceMap.put("cpu", new Quantity("1000", "m"));
resourceMap.put(
"cpu",
new Quantity(String.valueOf(cpuMicroCore > 0 ? cpuMicroCore : DruidK8sConstants.DEFAULT_CPU_MILLICORES), "m")
);
resourceMap.put("memory", new Quantity(String.valueOf(containerSize)));
ResourceRequirementsBuilder result = new ResourceRequirementsBuilder();
if (requirements != null) {

View File

@ -125,9 +125,12 @@ public class DruidPeonClientIntegrationTest
null
);
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new PeonCommandContext(Collections.singletonList(
"sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"
), new ArrayList<>(), new File(taskBasePath));
PeonCommandContext context = new PeonCommandContext(
Collections.singletonList("sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"),
new ArrayList<>(),
new File(taskBasePath),
config.getCpuCoreInMicro()
);
Job job = adapter.createJobFromPodSpec(podSpec, task, context);

View File

@ -190,7 +190,7 @@ class K8sTaskAdapterTest
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
task,
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"))
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"), config.getCpuCoreInMicro())
);
client.batch().v1().jobs().inNamespace("test").create(jobFromSpec);
JobList jobList = client.batch().v1().jobs().inNamespace("test").list();
@ -391,7 +391,8 @@ class K8sTaskAdapterTest
PeonCommandContext context = new PeonCommandContext(
new ArrayList<>(),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
0
);
assertEquals(expected, K8sTaskAdapter.getContainerMemory(context));
@ -399,7 +400,8 @@ class K8sTaskAdapterTest
new ArrayList<>(),
Collections.singletonList(
"-server -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=/druid/data -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"),
new File("/tmp")
new File("/tmp"),
0
);
expected = (long) ((HumanReadableBytes.parse("512m") + HumanReadableBytes.parse("1g")) * 1.2);
assertEquals(expected, K8sTaskAdapter.getContainerMemory(context));
@ -452,7 +454,8 @@ class K8sTaskAdapterTest
PeonCommandContext context = new PeonCommandContext(
new ArrayList<>(),
new ArrayList<>(),
new File("/tmp/")
new File("/tmp/"),
0
);
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
@ -548,7 +551,8 @@ class K8sTaskAdapterTest
new PeonCommandContext(
Collections.singletonList("foo && bar"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedEphemeralOutput.yaml", Job.class);
@ -572,6 +576,63 @@ class K8sTaskAdapterTest
Assertions.assertEquals(expected, actual);
}
@Test
void testCPUResourceIsEspected() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);
List<String> javaOpts = new ArrayList<>();
javaOpts.add("-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G");
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
.withJavaOptsArray(javaOpts)
.withCpuCore(2000)
.build();
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper,
taskLogs
);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
new PeonCommandContext(
Collections.singletonList("foo && bar"),
javaOpts,
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedCPUResourceOutput.yaml", Job.class);
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
actual.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
expected.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
Assertions.assertEquals(expected, actual);
}
@Test
void testEphemeralStorage()
{
@ -579,7 +640,8 @@ class K8sTaskAdapterTest
Container container = new ContainerBuilder().build();
ResourceRequirements result = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
// requests and limits will only have 2 items, cpu / memory
assertEquals(2, result.getLimits().size());
@ -591,7 +653,8 @@ class K8sTaskAdapterTest
container.setResources(new ResourceRequirementsBuilder().withRequests(requestMap).withLimits(limitMap).build());
ResourceRequirements ephemeralResult = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
// you will have ephemeral storage as well.
assertEquals(3, ephemeralResult.getLimits().size());
@ -609,7 +672,8 @@ class K8sTaskAdapterTest
container.getResources().setAdditionalProperty("additional", "some-value");
ResourceRequirements additionalProperties = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
assertEquals(1, additionalProperties.getAdditionalProperties().size());
}

View File

@ -108,9 +108,12 @@ class MultiContainerTaskAdapterTest
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutput.yaml", Job.class);
@ -159,9 +162,12 @@ class MultiContainerTaskAdapterTest
Job actual = adapter.createJobFromPodSpec(
spec,
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutputOrder.yaml", Job.class);
@ -212,9 +218,12 @@ class MultiContainerTaskAdapterTest
Job actual = adapter.createJobFromPodSpec(
spec,
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedPodSpec.yaml", Job.class);

View File

@ -110,7 +110,8 @@ class SingleContainerTaskAdapterTest
new PeonCommandContext(
Collections.singletonList("foo && bar"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);

View File

@ -0,0 +1,65 @@
apiVersion: "batch/v1"
kind: "Job"
metadata:
annotations:
task.id: "id"
tls.enabled: "false"
labels:
druid.k8s.peons: "true"
name: "id-3e70afe5cd823dfc7dd308eea616426b"
spec:
activeDeadlineSeconds: 14400
backoffLimit: 0
template:
metadata:
annotations:
task.id: "id"
tls.enabled: "false"
labels:
druid.k8s.peons: "true"
spec:
containers:
- args:
- "foo && bar"
command:
- "sh"
- "-c"
env:
- name: "druid_monitoring_monitors"
value: "[\"org.apache.druid.java.util.metrics.JvmMonitor\", \"org.apache.druid.server.metrics.TaskCountStatsMonitor\"\
]"
- name: "TASK_DIR"
value: "/tmp"
- name: "TASK_JSON"
value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
- name: "JAVA_OPTS"
value: "-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G"
- name: "druid_host"
valueFrom:
fieldRef:
fieldPath: "status.podIP"
- name: "HOSTNAME"
valueFrom:
fieldRef:
fieldPath: "metadata.name"
image: "one"
name: "main"
ports:
- containerPort: 8091
name: "druid-tls-port"
protocol: "TCP"
- containerPort: 8100
name: "druid-port"
protocol: "TCP"
resources:
limits:
memory: "6000000000"
cpu: "2000m"
ephemeral-storage: 10Gi
requests:
memory: "6000000000"
cpu: "2000m"
ephemeral-storage: 1Gi
hostname: "id-3e70afe5cd823dfc7dd308eea616426b"
restartPolicy: "Never"
ttlSecondsAfterFinished: 172800