K8s mm less fixes (#14028)

Update Fabric8 version and allow metrics monitors to be overriden
This commit is contained in:
Nicholas Lippis 2023-04-05 12:53:16 -04:00 committed by GitHub
parent ccf48245d7
commit 5810e650d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 433 additions and 175 deletions

View File

@ -81,6 +81,7 @@ Additional Configuration
|`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No| |`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No|
|`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No| |`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No| |`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.peonMonitors`|`JsonArray`|Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord.|`[]`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No| |`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No|
### Gotchas ### Gotchas

View File

@ -104,7 +104,7 @@
<dependency> <dependency>
<groupId>io.fabric8</groupId> <groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId> <artifactId>kubernetes-model-core</artifactId>
<version>5.12.2</version> <version>6.4.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.validation</groupId> <groupId>javax.validation</groupId>
@ -114,12 +114,18 @@
<dependency> <dependency>
<groupId>io.fabric8</groupId> <groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-batch</artifactId> <artifactId>kubernetes-model-batch</artifactId>
<version>5.12.2</version> <version>6.4.1</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-api</artifactId>
<version>6.4.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.fabric8</groupId> <groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId> <artifactId>kubernetes-client</artifactId>
<version>5.12.2</version> <version>6.4.1</version>
<scope>runtime</scope>
</dependency> </dependency>
<!-- Tests --> <!-- Tests -->
@ -136,7 +142,7 @@
<dependency> <dependency>
<groupId>io.fabric8</groupId> <groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId> <artifactId>kubernetes-server-mock</artifactId>
<version>5.12.2</version> <version>6.4.1</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -188,7 +188,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
} else { } else {
status = TaskStatus.failure( status = TaskStatus.failure(
task.getId(), task.getId(),
"Task failed %s: " + k8sTaskId "Task failed: " + k8sTaskId
); );
} }
if (completedPhase.getJobDuration().isPresent()) { if (completedPhase.getJobDuration().isPresent()) {
@ -245,6 +245,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
@Override @Override
public void updateStatus(Task task, TaskStatus status) public void updateStatus(Task task, TaskStatus status)
{ {
log.info("Updating task: %s with status %s", task.getId(), status);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
} }

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import org.joda.time.Period; import org.joda.time.Period;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -87,9 +88,16 @@ public class KubernetesTaskRunnerConfig
// how long to wait for the peon k8s job to launch // how long to wait for the peon k8s job to launch
public Period k8sjobLaunchTimeout = new Period("PT1H"); public Period k8sjobLaunchTimeout = new Period("PT1H");
@JsonProperty
// ForkingTaskRunner inherits the monitors from the MM, in k8s mode
// the peon inherits the monitors from the overlord, so if someone specifies
// a TaskCountStatsMonitor in the overlord for example, the peon process
// fails because it can not inject this monitor in the peon process.
public List<String> peonMonitors = new ArrayList<>();
@JsonProperty @JsonProperty
@NotNull @NotNull
public List<String> javaOptsArray; public List<String> javaOptsArray = new ArrayList<>();
@JsonProperty @JsonProperty
@NotNull @NotNull

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
@ -90,7 +91,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
{ {
DruidKubernetesClient client; DruidKubernetesClient client;
if (kubernetesTaskRunnerConfig.disableClientProxy) { if (kubernetesTaskRunnerConfig.disableClientProxy) {
Config config = Config.autoConfigure(null); Config config = new ConfigBuilder().build();
config.setHttpsProxy(null); config.setHttpsProxy(null);
config.setHttpProxy(null); config.setHttpProxy(null);
client = new DruidKubernetesClient(config); client = new DruidKubernetesClient(config);
@ -143,7 +144,6 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
); );
} else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) { } else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
return new PodTemplateTaskAdapter( return new PodTemplateTaskAdapter(
client,
kubernetesTaskRunnerConfig, kubernetesTaskRunnerConfig,
taskConfig, taskConfig,
druidNode, druidNode,

View File

@ -20,8 +20,9 @@
package org.apache.druid.k8s.overlord.common; package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
public class DruidKubernetesClient implements KubernetesClientApi public class DruidKubernetesClient implements KubernetesClientApi
{ {
@ -30,7 +31,7 @@ public class DruidKubernetesClient implements KubernetesClientApi
public DruidKubernetesClient() public DruidKubernetesClient()
{ {
this(Config.autoConfigure(null)); this(new ConfigBuilder().build());
} }
public DruidKubernetesClient(Config config) public DruidKubernetesClient(Config config)
@ -41,8 +42,14 @@ public class DruidKubernetesClient implements KubernetesClientApi
@Override @Override
public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException
{ {
try (KubernetesClient client = new DefaultKubernetesClient(config)) { try (KubernetesClient client = getClient()) {
return executor.executeRequest(client); return executor.executeRequest(client);
} }
} }
@Override
public KubernetesClient getClient()
{
return new KubernetesClientBuilder().withConfig(config).build();
}
} }

View File

@ -25,13 +25,11 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.io.input.ReaderInputStream; import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import java.io.InputStream; import java.io.InputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -78,7 +76,7 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
// launch job // launch job
return clientApi.executeRequest(client -> { return clientApi.executeRequest(client -> {
client.batch().v1().jobs().inNamespace(namespace).create(job); client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName()); K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
log.info("Successfully submitted job: %s ... waiting for job to launch", taskId); log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
// wait until the pod is running or complete or failed, any of those is fine // wait until the pod is running or complete or failed, any of those is fine
@ -106,7 +104,8 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
.inNamespace(namespace) .inNamespace(namespace)
.withName(taskId.getK8sTaskId()) .withName(taskId.getK8sTaskId())
.waitUntilCondition( .waitUntilCondition(
x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null), x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null
&& (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)),
howLong, howLong,
unit unit
); );
@ -117,6 +116,7 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
if (job.getStatus().getSucceeded() != null) { if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED); return new JobResponse(job, PeonPhase.SUCCEEDED);
} }
log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job, PeonPhase.FAILED); return new JobResponse(job, PeonPhase.FAILED);
}); });
} }
@ -125,12 +125,12 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
public boolean cleanUpJob(K8sTaskId taskId) public boolean cleanUpJob(K8sTaskId taskId)
{ {
if (!debugJobs) { if (!debugJobs) {
Boolean result = clientApi.executeRequest(client -> client.batch() Boolean result = clientApi.executeRequest(client -> !client.batch()
.v1() .v1()
.jobs() .jobs()
.inNamespace(namespace) .inNamespace(namespace)
.withName(taskId.getK8sTaskId()) .withName(taskId.getK8sTaskId())
.delete()); .delete().isEmpty());
if (result) { if (result) {
log.info("Cleaned up k8s task: %s", taskId); log.info("Cleaned up k8s task: %s", taskId);
} else { } else {
@ -147,23 +147,24 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
@Override @Override
public Optional<InputStream> getPeonLogs(K8sTaskId taskId) public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
{ {
KubernetesClient k8sClient = clientApi.getClient();
try { try {
return clientApi.executeRequest(client -> { LogWatch logWatch = k8sClient.batch()
Reader reader = client.batch() .v1()
.v1() .jobs()
.jobs() .inNamespace(namespace)
.inNamespace(namespace) .withName(taskId.getK8sTaskId())
.withName(taskId.getK8sTaskId()) .inContainer("main")
.inContainer("main") .watchLog();
.getLogReader(); if (logWatch == null) {
if (reader == null) { k8sClient.close();
return Optional.absent(); return Optional.absent();
} }
return Optional.of(new ReaderInputStream(reader, StandardCharsets.UTF_8)); return Optional.of(new LogWatchInputStream(k8sClient, logWatch));
});
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Error streaming logs from task: %s", taskId); log.error(e, "Error streaming logs from task: %s", taskId);
k8sClient.close();
return Optional.absent(); return Optional.absent();
} }
} }
@ -184,17 +185,17 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
public List<Pod> listPeonPods(Set<PeonPhase> phases) public List<Pod> listPeonPods(Set<PeonPhase> phases)
{ {
return listPeonPods().stream() return listPeonPods().stream()
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x))) .filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override @Override
public List<Pod> listPeonPods() public List<Pod> listPeonPods()
{ {
PodList podList = clientApi.executeRequest(client -> client.pods().inNamespace(namespace)) return clientApi.executeRequest(client -> client.pods().inNamespace(namespace)
.withLabel(DruidK8sConstants.LABEL_KEY) .withLabel(DruidK8sConstants.LABEL_KEY)
.list(); .list().getItems());
return podList.getItems();
} }
@Override @Override
@ -204,7 +205,12 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
return clientApi.executeRequest(client -> { return clientApi.executeRequest(client -> {
List<Job> jobs = getJobsToCleanup(listAllPeonJobs(), howFarBack, timeUnit); List<Job> jobs = getJobsToCleanup(listAllPeonJobs(), howFarBack, timeUnit);
jobs.forEach(x -> { jobs.forEach(x -> {
if (client.batch().v1().jobs().inNamespace(namespace).withName(x.getMetadata().getName()).delete()) { if (!client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(x.getMetadata().getName())
.delete().isEmpty()) {
numDeleted.incrementAndGet(); numDeleted.incrementAndGet();
} }
}); });
@ -258,5 +264,4 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found"); throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found");
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.k8s.overlord.common; package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -119,6 +120,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
@Override @Override
public Task toTask(Pod from) throws IOException public Task toTask(Pod from) throws IOException
{ {
// all i have to do here is grab the main container...done
PodSpec podSpec = from.getSpec(); PodSpec podSpec = from.getSpec();
massageSpec(podSpec, "main"); massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv(); List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
@ -199,8 +201,19 @@ public abstract class K8sTaskAdapter implements TaskAdapter
mainContainer.setPorts(Lists.newArrayList(httpsPort, tcpPort)); mainContainer.setPorts(Lists.newArrayList(httpsPort, tcpPort));
} }
protected void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents) @VisibleForTesting
void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
throws JsonProcessingException
{ {
// if the peon monitors are set, override the overlord's monitors (if set) with the peon monitors
if (!taskRunnerConfig.peonMonitors.isEmpty()) {
mainContainer.getEnv().removeIf(x -> "druid_monitoring_monitors".equals(x.getName()));
mainContainer.getEnv().add(new EnvVarBuilder()
.withName("druid_monitoring_monitors")
.withValue(mapper.writeValueAsString(taskRunnerConfig.peonMonitors))
.build());
}
mainContainer.getEnv().addAll(Lists.newArrayList( mainContainer.getEnv().addAll(Lists.newArrayList(
new EnvVarBuilder() new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV) .withName(DruidK8sConstants.TASK_DIR_ENV)
@ -234,7 +247,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
PeonCommandContext context, PeonCommandContext context,
long containerSize, long containerSize,
String taskContents String taskContents
) ) throws JsonProcessingException
{ {
// prepend the startup task.json extraction command // prepend the startup task.json extraction command
List<String> mainCommand = Lists.newArrayList("sh", "-c"); List<String> mainCommand = Lists.newArrayList("sh", "-c");

View File

@ -19,8 +19,15 @@
package org.apache.druid.k8s.overlord.common; package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.client.KubernetesClient;
// Wraps all kubernetes api calls, to ensure you open and close connections properly // Wraps all kubernetes api calls, to ensure you open and close connections properly
public interface KubernetesClientApi public interface KubernetesClientApi
{ {
<T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException; <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException;
// use only when handling streams of data, example if you want to pass around an input stream from a pod,
// then you would call this instead of executeRequest as you would want to keep the connection open until you
// are done with the stream. Callers responsibility to clean up when using this method
KubernetesClient getClient();
} }

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import java.io.IOException;
import java.io.InputStream;
/**
* This wraps the InputStream for k8s client
* When you call close on the stream, it will also close the open
* http connections and the client
*/
public class LogWatchInputStream extends InputStream
{
private final KubernetesClient client;
private final LogWatch logWatch;
public LogWatchInputStream(KubernetesClient client, LogWatch logWatch)
{
this.client = client;
this.logWatch = logWatch;
}
@Override
public int read() throws IOException
{
return logWatch.getOutput().read();
}
@Override
public void close()
{
logWatch.close();
client.close();
}
}

View File

@ -34,6 +34,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
@ -47,6 +48,7 @@ import org.apache.druid.server.DruidNode;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -74,7 +76,6 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private static final Logger log = new Logger(PodTemplateTaskAdapter.class); private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s"; private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.%s";
private final KubernetesClientApi client;
private final KubernetesTaskRunnerConfig taskRunnerConfig; private final KubernetesTaskRunnerConfig taskRunnerConfig;
private final TaskConfig taskConfig; private final TaskConfig taskConfig;
private final DruidNode node; private final DruidNode node;
@ -82,7 +83,6 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private final HashMap<String, PodTemplate> templates; private final HashMap<String, PodTemplate> templates;
public PodTemplateTaskAdapter( public PodTemplateTaskAdapter(
KubernetesClientApi client,
KubernetesTaskRunnerConfig taskRunnerConfig, KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig, TaskConfig taskConfig,
DruidNode node, DruidNode node,
@ -90,7 +90,6 @@ public class PodTemplateTaskAdapter implements TaskAdapter
Properties properties Properties properties
) )
{ {
this.client = client;
this.taskRunnerConfig = taskRunnerConfig; this.taskRunnerConfig = taskRunnerConfig;
this.taskConfig = taskConfig; this.taskConfig = taskConfig;
this.node = node; this.node = node;
@ -198,7 +197,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
return Optional.empty(); return Optional.empty();
} }
try { try {
return Optional.of(client.executeRequest(client -> client.v1().podTemplates().load(new File(podTemplateFile)).get())); return Optional.of(Serialization.unmarshal(Files.newInputStream(new File(podTemplateFile).toPath()), PodTemplate.class));
} }
catch (Exception e) { catch (Exception e) {
throw new ISE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile); throw new ISE(e, "Failed to load pod template file for [%s] at [%s]", property, podTemplateFile);

View File

@ -59,6 +59,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
// must have a kind / minikube cluster installed and the image pushed to your repository // must have a kind / minikube cluster installed and the image pushed to your repository
@Disabled
public class DruidPeonClientIntegrationTest public class DruidPeonClientIntegrationTest
{ {
private StartupLoggingConfig startupLoggingConfig; private StartupLoggingConfig startupLoggingConfig;

View File

@ -22,11 +22,14 @@ package org.apache.druid.k8s.overlord.common;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.api.client.util.Joiner;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder; import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.PodSpec;
@ -67,7 +70,7 @@ class K8sTaskAdapterTest
private final StartupLoggingConfig startupLoggingConfig; private final StartupLoggingConfig startupLoggingConfig;
private final TaskConfig taskConfig; private final TaskConfig taskConfig;
private final DruidNode node; private final DruidNode node;
private ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
public K8sTaskAdapterTest() public K8sTaskAdapterTest()
{ {
@ -253,4 +256,75 @@ class K8sTaskAdapterTest
}); });
} }
@Test
void testAddingMonitors() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
PeonCommandContext context = new PeonCommandContext(
new ArrayList<>(),
new ArrayList<>(),
new File("/tmp/")
);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper
);
Task task = K8sTestUtils.getTask();
// no monitor in overlord, no monitor override
Container container = new ContainerBuilder()
.withName("container").build();
adapter.addEnvironmentVariables(container, context, task.toString());
assertFalse(
container.getEnv().stream().anyMatch(x -> x.getName().equals("druid_monitoring_monitors")),
"Didn't match, envs: " + Joiner.on(',').join(container.getEnv())
);
// we have an override, but nothing in the overlord
config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class);
adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper
);
adapter.addEnvironmentVariables(container, context, task.toString());
EnvVar env = container.getEnv()
.stream()
.filter(x -> x.getName().equals("druid_monitoring_monitors"))
.findFirst()
.get();
assertEquals(jsonMapper.writeValueAsString(config.peonMonitors), env.getValue());
// we override what is in the overlord
config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class);
adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper
);
container.getEnv().add(new EnvVarBuilder()
.withName("druid_monitoring_monitors")
.withValue(
"'[\"org.apache.druid.java.util.metrics.JvmMonitor\", "
+ "\"org.apache.druid.server.metrics.TaskCountStatsMonitor\"]'")
.build());
adapter.addEnvironmentVariables(container, context, task.toString());
env = container.getEnv()
.stream()
.filter(x -> x.getName().equals("druid_monitoring_monitors"))
.findFirst()
.get();
assertEquals(jsonMapper.writeValueAsString(config.peonMonitors), env.getValue());
}
} }

View File

@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodSpecBuilder; import io.fabric8.kubernetes.api.model.PodSpecBuilder;
import io.fabric8.kubernetes.api.model.PodTemplateSpec; import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.apache.commons.text.CharacterPredicates; import org.apache.commons.text.CharacterPredicates;
import org.apache.commons.text.RandomStringGenerator; import org.apache.commons.text.RandomStringGenerator;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
@ -142,4 +143,11 @@ public class K8sTestUtils
); );
} }
public static <T> T fileToResource(String contents, Class<T> type)
{
return Serialization.unmarshal(
K8sTestUtils.class.getClassLoader().getResourceAsStream(contents),
type
);
}
} }

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.common;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class LogWatchInputStreamTest
{
@Test
void testFlow() throws IOException
{
LogWatch logWatch = mock(LogWatch.class);
InputStream inputStream = mock(InputStream.class);
when(inputStream.read()).thenReturn(1);
when(logWatch.getOutput()).thenReturn(inputStream);
KubernetesClient client = mock(KubernetesClient.class);
LogWatchInputStream stream = new LogWatchInputStream(client, logWatch);
int result = stream.read();
Assertions.assertEquals(1, result);
verify(inputStream, times(1)).read();
stream.close();
verify(logWatch, times(1)).close();
verify(client, times(1)).close();
}
}

View File

@ -45,6 +45,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List;
@EnableKubernetesMockClient(crud = true) @EnableKubernetesMockClient(crud = true)
class MultiContainerTaskAdapterTest class MultiContainerTaskAdapterTest
@ -100,7 +101,7 @@ class MultiContainerTaskAdapterTest
public void testMultiContainerSupport() throws IOException public void testMultiContainerSupport() throws IOException
{ {
TestKubernetesClient testClient = new TestKubernetesClient(client); TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml")).get(); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test"; config.namespace = "test";
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter( MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
@ -120,11 +121,7 @@ class MultiContainerTaskAdapterTest
new File("/tmp") new File("/tmp")
) )
); );
Job expected = client.batch() Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutput.yaml", Job.class);
.v1()
.jobs()
.load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutput.yaml"))
.get();
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem // this would never happen in real life, but for the jdk 17 tests this is a problem
@ -150,7 +147,7 @@ class MultiContainerTaskAdapterTest
public void testMultiContainerSupportWithNamedContainer() throws IOException public void testMultiContainerSupportWithNamedContainer() throws IOException
{ {
TestKubernetesClient testClient = new TestKubernetesClient(client); TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = client.pods().load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpecOrder.yaml")).get(); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml", Pod.class);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test"; config.namespace = "test";
config.primaryContainerName = "primary"; config.primaryContainerName = "primary";
@ -166,29 +163,73 @@ class MultiContainerTaskAdapterTest
PodSpec spec = pod.getSpec(); PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, "primary"); K8sTaskAdapter.massageSpec(spec, "primary");
Job actual = adapter.createJobFromPodSpec( Job actual = adapter.createJobFromPodSpec(
spec, spec,
task, task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(), new ArrayList<>(),
new File("/tmp") new File("/tmp")
) )
); );
Job expected = client.batch() Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutputOrder.yaml", Job.class);
.v1()
.jobs()
.load(this.getClass().getClassLoader().getResourceAsStream("expectedMultiContainerOutputOrder.yaml"))
.get();
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem // this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450 // could be related to: https://bugs.openjdk.org/browse/JDK-8081450
actual.getSpec() actual.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
expected.getSpec()
.getTemplate() .getTemplate()
.getSpec() .getSpec()
.getContainers() .getContainers()
.get(0) .get(0)
.getEnv() .getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON")); .removeIf(x -> x.getName().equals("TASK_JSON"));
Assertions.assertEquals(expected, actual);
}
@Test
public void testOverridingPeonMonitors() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class);
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test";
config.primaryContainerName = "primary";
config.peonMonitors = jsonMapper.readValue("[\"org.apache.druid.java.util.metrics.JvmMonitor\"]", List.class);
MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient,
config,
taskConfig,
startupLoggingConfig,
druidNode,
jsonMapper);
NoopTask task = NoopTask.create("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, config.primaryContainerName);
Job actual = adapter.createJobFromPodSpec(
spec,
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
)
);
Job expected = K8sTestUtils.fileToResource("expectedPodSpec.yaml", Job.class);
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
actual.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
expected.getSpec() expected.getSpec()
.getTemplate() .getTemplate()
.getSpec() .getSpec()

View File

@ -25,8 +25,6 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
@ -48,13 +46,10 @@ import java.nio.file.Path;
import java.util.Collections; import java.util.Collections;
import java.util.Properties; import java.util.Properties;
@EnableKubernetesMockClient()
public class PodTemplateTaskAdapterTest public class PodTemplateTaskAdapterTest
{ {
@TempDir private Path tempDir; @TempDir private Path tempDir;
private KubernetesClient client;
private KubernetesTaskRunnerConfig taskRunnerConfig; private KubernetesTaskRunnerConfig taskRunnerConfig;
private TestKubernetesClient testClient;
private PodTemplate podTemplateSpec; private PodTemplate podTemplateSpec;
private TaskConfig taskConfig; private TaskConfig taskConfig;
private DruidNode node; private DruidNode node;
@ -64,7 +59,6 @@ public class PodTemplateTaskAdapterTest
public void setup() public void setup()
{ {
taskRunnerConfig = new KubernetesTaskRunnerConfig(); taskRunnerConfig = new KubernetesTaskRunnerConfig();
testClient = new TestKubernetesClient(client);
taskConfig = new TaskConfig( taskConfig = new TaskConfig(
"/tmp", "/tmp",
null, null,
@ -92,14 +86,7 @@ public class PodTemplateTaskAdapterTest
false false
); );
mapper = new TestUtils().getTestObjectMapper(); mapper = new TestUtils().getTestObjectMapper();
podTemplateSpec = client podTemplateSpec = K8sTestUtils.fileToResource("basePodTemplate.yaml", PodTemplate.class);
.v1()
.podTemplates()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePodTemplate.yaml")
)
.get();
} }
@Test @Test
@ -109,7 +96,6 @@ public class PodTemplateTaskAdapterTest
"Pod template task adapter requires a base pod template to be specified", "Pod template task adapter requires a base pod template to be specified",
IAE.class, IAE.class,
() -> new PodTemplateTaskAdapter( () -> new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -130,7 +116,6 @@ public class PodTemplateTaskAdapterTest
"Pod template task adapter requires a base pod template to be specified", "Pod template task adapter requires a base pod template to be specified",
ISE.class, ISE.class,
() -> new PodTemplateTaskAdapter( () -> new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -149,7 +134,6 @@ public class PodTemplateTaskAdapterTest
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -168,15 +152,7 @@ public class PodTemplateTaskAdapterTest
null null
); );
Job actual = adapter.fromTask(task); Job actual = adapter.fromTask(task);
Job expected = client Job expected = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
.batch()
.v1()
.jobs()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("expectedNoopJob.yaml")
)
.get();
Assertions.assertEquals(expected, actual); Assertions.assertEquals(expected, actual);
} }
@ -191,7 +167,6 @@ public class PodTemplateTaskAdapterTest
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
new DruidNode( new DruidNode(
@ -219,15 +194,7 @@ public class PodTemplateTaskAdapterTest
); );
Job actual = adapter.fromTask(task); Job actual = adapter.fromTask(task);
Job expected = client Job expected = K8sTestUtils.fileToResource("expectedNoopJobTlsEnabled.yaml", Job.class);
.batch()
.v1()
.jobs()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("expectedNoopJobTlsEnabled.yaml")
)
.get();
Assertions.assertEquals(expected, actual); Assertions.assertEquals(expected, actual);
} }
@ -244,7 +211,6 @@ public class PodTemplateTaskAdapterTest
props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", noopTemplatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", noopTemplatePath.toString());
Assert.assertThrows(ISE.class, () -> new PodTemplateTaskAdapter( Assert.assertThrows(ISE.class, () -> new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -264,7 +230,6 @@ public class PodTemplateTaskAdapterTest
props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", templatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -284,15 +249,7 @@ public class PodTemplateTaskAdapterTest
); );
Job actual = adapter.fromTask(task); Job actual = adapter.fromTask(task);
Job expected = client Job expected = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
.batch()
.v1()
.jobs()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("expectedNoopJob.yaml")
)
.get();
Assertions.assertEquals(expected, actual); Assertions.assertEquals(expected, actual);
} }
@ -307,7 +264,6 @@ public class PodTemplateTaskAdapterTest
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -315,13 +271,7 @@ public class PodTemplateTaskAdapterTest
props props
); );
Pod pod = client Pod pod = K8sTestUtils.fileToResource("basePodWithoutAnnotations.yaml", Pod.class);
.pods()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePodWithoutAnnotations.yaml")
)
.get();
Assert.assertThrows(IOE.class, () -> adapter.toTask(pod)); Assert.assertThrows(IOE.class, () -> adapter.toTask(pod));
} }
@ -336,7 +286,6 @@ public class PodTemplateTaskAdapterTest
props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -344,13 +293,7 @@ public class PodTemplateTaskAdapterTest
props props
); );
Pod basePod = client Pod basePod = K8sTestUtils.fileToResource("basePodWithoutAnnotations.yaml", Pod.class);
.pods()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePodWithoutAnnotations.yaml")
)
.get();
Pod pod = new PodBuilder(basePod) Pod pod = new PodBuilder(basePod)
.editMetadata() .editMetadata()
@ -371,7 +314,6 @@ public class PodTemplateTaskAdapterTest
props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
testClient,
taskRunnerConfig, taskRunnerConfig,
taskConfig, taskConfig,
node, node,
@ -379,13 +321,7 @@ public class PodTemplateTaskAdapterTest
props props
); );
Pod pod = client Pod pod = K8sTestUtils.fileToResource("basePod.yaml", Pod.class);
.pods()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("basePod.yaml")
)
.get();
Task actual = adapter.toTask(pod); Task actual = adapter.toTask(pod);
Task expected = NoopTask.create("id", 1); Task expected = NoopTask.create("id", 1);

View File

@ -99,9 +99,7 @@ class SingleContainerTaskAdapterTest
public void testSingleContainerSupport() throws IOException public void testSingleContainerSupport() throws IOException
{ {
TestKubernetesClient testClient = new TestKubernetesClient(client); TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = client.pods() Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class);
.load(this.getClass().getClassLoader().getResourceAsStream("multiContainerPodSpec.yaml"))
.get();
KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig(); KubernetesTaskRunnerConfig config = new KubernetesTaskRunnerConfig();
config.namespace = "test"; config.namespace = "test";
SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter( SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
@ -122,13 +120,8 @@ class SingleContainerTaskAdapterTest
new File("/tmp") new File("/tmp")
) )
); );
Job expected = client.batch()
.v1() Job expected = K8sTestUtils.fileToResource("expectedSingleContainerOutput.yaml", Job.class);
.jobs()
.load(this.getClass()
.getClassLoader()
.getResourceAsStream("expectedSingleContainerOutput.yaml"))
.get();
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem // this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450 // could be related to: https://bugs.openjdk.org/browse/JDK-8081450

View File

@ -36,4 +36,10 @@ public class TestKubernetesClient implements KubernetesClientApi
{ {
return executor.executeRequest(client); return executor.executeRequest(client);
} }
@Override
public KubernetesClient getClient()
{
return client;
}
} }

View File

@ -2,29 +2,50 @@ apiVersion: "batch/v1"
kind: "Job" kind: "Job"
metadata: metadata:
annotations: annotations:
task.id: "task_id" task.id: "id"
tls.enabled: "false" tls.enabled: "false"
labels: labels:
druid.k8s.peons: "true" druid.k8s.peons: "true"
name: "taskid" name: "id"
spec: spec:
activeDeadlineSeconds: 3600 activeDeadlineSeconds: 14400
backoffLimit: 0 backoffLimit: 0
template: template:
metadata: metadata:
annotations: annotations:
task.id: "task_id" task.id: "id"
tls.enabled: "false" tls.enabled: "false"
labels: labels:
druid.k8s.peons: "true" druid.k8s.peons: "true"
spec: spec:
containers: containers:
- args: - args:
- "trap 'touch /usr/share/pod/done' EXIT; mkdir -p ${TASK_DIR}; echo ${TASK_JSON}\ - "/kubexit/kubexit /bin/sh -c \"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6\
\ | base64 -d | gzip -d > ${TASK_DIR}/task.json; " \ 1\""
command: command:
- "sh" - "/bin/sh"
- "-c" - "-c"
env:
- name: "druid_monitoring_monitors"
value: '["org.apache.druid.java.util.metrics.JvmMonitor"]'
- name: "TASK_DIR"
value: "/tmp"
- name: "TASK_JSON"
value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA="
- name: "JAVA_OPTS"
value: ""
- name: "druid_host"
valueFrom:
fieldRef:
fieldPath: "status.podIP"
- name: "HOSTNAME"
valueFrom:
fieldRef:
fieldPath: "metadata.name"
- name: "KUBEXIT_NAME"
value: "main"
- name: "KUBEXIT_GRAVEYARD"
value: "/graveyard"
image: "one" image: "one"
name: "main" name: "main"
ports: ports:
@ -37,30 +58,50 @@ spec:
resources: resources:
limits: limits:
cpu: "1000m" cpu: "1000m"
memory: "1000000000" memory: "2400000000"
requests: requests:
cpu: "1000m" cpu: "1000m"
memory: "1000000000" memory: "2400000000"
volumeMounts: volumeMounts:
- mountPath: "/usr/share/pod" - mountPath: "/graveyard"
name: "peon-share" name: "graveyard"
- command: - mountPath: "/kubexit"
- "tail -f /dev/null" name: "kubexit"
- args:
- "/kubexit/kubexit /bin/sh -c \"tail -f /dev/null\" || true"
command:
- "/bin/sh"
- "-c"
env:
- name: "KUBEXIT_NAME"
value: "sidecar"
- name: "KUBEXIT_GRAVEYARD"
value: "/graveyard"
- name: "KUBEXIT_DEATH_DEPS"
value: "main"
image: "two" image: "two"
lifecycle:
postStart:
exec:
command:
- "while ! test -f /usr/share/pod/done; do echo 'Waiting for the main\
\ pod to finish...'; sleep 5; done; echo 'Agent pod finished, exiting';\
\ exit 0"
name: "sidecar" name: "sidecar"
volumeMounts: volumeMounts:
- mountPath: "/usr/share/pod" - mountPath: "/graveyard"
name: "peon-share" name: "graveyard"
readOnly: true - mountPath: "/kubexit"
name: "kubexit"
hostname: "id"
initContainers:
- command:
- "cp"
- "/bin/kubexit"
- "/kubexit/kubexit"
image: "karlkfi/kubexit:v0.3.2"
name: "kubexit"
volumeMounts:
- mountPath: "/kubexit"
name: "kubexit"
restartPolicy: "Never" restartPolicy: "Never"
volumes: volumes:
- emptyDir:
medium: "Memory"
name: "graveyard"
- emptyDir: {} - emptyDir: {}
name: "peon-share" name: "kubexit"
ttlSecondsAfterFinished: 7200 ttlSecondsAfterFinished: 172800

View File

@ -901,7 +901,7 @@ name: kubernetes fabric java client
license_category: binary license_category: binary
module: extensions-contrib/kubernetes-overlord-extensions module: extensions-contrib/kubernetes-overlord-extensions
license_name: Apache License version 2.0 license_name: Apache License version 2.0
version: 5.12.2 version: 6.4.1
libraries: libraries:
- io.fabric8: kubernetes-client - io.fabric8: kubernetes-client