Support Dynamic Peon Pod Template Selection in K8s extension (#16510)

* initial commit

* add Javadocs

* refine JSON input config

* more test and fix build

* extract existing behavior as default strategy

* change template mapping fallback

* add docs

* update doc

* fix doc

* address comments

* define Matcher interface

* fix test coverage

* use lower case for endpoint path

* update Json name

* add more tests

* refactoring Selector class
This commit is contained in:
YongGang 2024-06-12 15:27:10 -07:00 committed by GitHub
parent f8645de341
commit 46dbc74053
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1485 additions and 40 deletions

View File

@ -217,6 +217,59 @@ data:
druid.peon.mode=remote
druid.indexer.task.encapsulatedTask=true
```
#### Dynamic Pod Template Selection Config
The Dynamic Pod Template Selection feature enhances the K8s extension by enabling more flexible and dynamic selection of pod templates based on task properties. This process is governed by the `PodTemplateSelectStrategy`. Below are the two strategies implemented:
|Property|Description|Default|
|--------|-----------|-------|
|`TaskTypePodTemplateSelectStrategy`| This strategy selects pod templates based on task type for execution purposes, implementing the behavior that maps templates to specific task types. | true |
|`SelectorBasedPodTemplateSelectStrategy`| This strategy evaluates a series of selectors, known as `selectors`, which are aligned with potential task properties. | false |
`SelectorBasedPodTemplateSelectStrategy`, the strategy implementing this new feature, is based on conditional `selectors` that match against top-level keys from the task payload. Currently, it supports matching based on task context tags, task type, and dataSource. These selectors are ordered in the dynamic configuration, with the first selector given the highest priority during the evaluation process. This means that the selection process uses these ordered conditions to determine a tasks Pod template. The first matching condition immediately determines the Pod template, thereby prioritizing certain configurations over others. If no selector matches, it will fall back to an optional `defaultKey` if configured; if there is still no match, it will use the `base` template.
Example Configuration:
We define two template keys in the configuration—`low-throughput` and `medium-throughput`—each associated with specific task conditions and arranged in a priority order.
- Low Throughput Template: This is the first template evaluated and has the highest priority. Tasks that have a context tag `billingCategory=streaming_ingestion` and a datasource of `wikipedia` will be classified under the `low-throughput` template. This classification directs such tasks to utilize a predefined pod template optimized for low throughput requirements.
- Medium Throughput Template: If a task does not meet the low-throughput criteria, the system will then evaluate it against the next selector in order. In this example, if the task type is index_kafka, it will fall into the `medium-throughput` template.
```
{
"type": "default",
"podTemplateSelectStrategy":
{
"type": "selectorBased",
"selectors": [
{
"selectionKey": "low-throughput",
"context.tags":
{
"billingCategory": ["streaming_ingestion"]
},
"dataSource": ["wikipedia"]
},
{
"selectionKey": "medium-throughput",
"type": ["index_kafka"]
}
],
"defaultKey"" "base"
}
}
```
Task specific pod templates can be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{template}: /path/to/taskSpecificPodSpec.yaml` where {template} is the matched `selectionKey` of the `podTemplateSelectStrategy` i.e low-throughput.
Similar to Overlord dynamic configuration, the following API endpoints are defined to retrieve and manage dynamic configurations of Pod Template Selection config:
- Get dynamic configuration:
`POST` `/druid/indexer/v1/k8s/taskRunner/executionConfig`
- Update dynamic configuration:
`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig`
- Get dynamic configuration history:
`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig/history`
### Properties
|Property| Possible Values | Description |Default|required|

View File

@ -139,6 +139,26 @@
<version>6.7.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -32,6 +32,8 @@ import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.JacksonConfigProvider;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
@ -49,6 +51,8 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskExecutionConfigResource;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
@ -75,6 +79,7 @@ public class KubernetesOverlordModule implements DruidModule
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
JacksonConfigProvider.bind(binder, KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class, null);
PolyBind.createChoice(
binder,
"druid.indexer.runner.type",
@ -98,6 +103,8 @@ public class KubernetesOverlordModule implements DruidModule
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
configureTaskLogs(binder);
Jerseys.addResource(binder, KubernetesTaskExecutionConfigResource.class);
}
@Provides

View File

@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
@ -32,6 +33,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
@ -56,6 +58,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final Properties properties;
private final DruidKubernetesClient druidKubernetesClient;
private final ServiceEmitter emitter;
private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
private KubernetesTaskRunner runner;
@Inject
@ -69,7 +72,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
TaskConfig taskConfig,
Properties properties,
DruidKubernetesClient druidKubernetesClient,
ServiceEmitter emitter
ServiceEmitter emitter,
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
)
{
this.smileMapper = smileMapper;
@ -82,6 +86,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
this.emitter = emitter;
this.dynamicConfigRef = dynamicConfigRef;
}
@Override
@ -146,7 +151,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
druidNode,
smileMapper,
properties,
taskLogs
taskLogs,
dynamicConfigRef
);
} else {
return new SingleContainerTaskAdapter(

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Objects;
public class DefaultKubernetesTaskRunnerDynamicConfig implements KubernetesTaskRunnerDynamicConfig
{
private final PodTemplateSelectStrategy podTemplateSelectStrategy;
@JsonCreator
public DefaultKubernetesTaskRunnerDynamicConfig(
@JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy podTemplateSelectStrategy
)
{
Preconditions.checkNotNull(podTemplateSelectStrategy);
this.podTemplateSelectStrategy = podTemplateSelectStrategy;
}
@Override
@JsonProperty
public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
{
return podTemplateSelectStrategy;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKubernetesTaskRunnerDynamicConfig that = (DefaultKubernetesTaskRunnerDynamicConfig) o;
return Objects.equals(podTemplateSelectStrategy, that.podTemplateSelectStrategy);
}
@Override
public int hashCode()
{
return Objects.hashCode(podTemplateSelectStrategy);
}
@Override
public String toString()
{
return "DefaultKubernetesTaskRunnerDynamicConfig{" +
"podTemplateSelectStrategy=" + podTemplateSelectStrategy +
'}';
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.google.common.collect.ImmutableMap;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
import org.joda.time.Interval;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/**
* Resource that manages Kubernetes-specific execution configurations for running tasks.
*
* <p>This class handles the CRUD operations for execution configurations and provides
* endpoints to update, retrieve, and manage the history of these configurations.</p>
*/
@Path("/druid/indexer/v1/k8s/taskrunner/executionconfig")
public class KubernetesTaskExecutionConfigResource
{
private static final Logger log = new Logger(KubernetesTaskExecutionConfigResource.class);
private final JacksonConfigManager configManager;
private final AuditManager auditManager;
private AtomicReference<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef = null;
@Inject
public KubernetesTaskExecutionConfigResource(
final JacksonConfigManager configManager,
final AuditManager auditManager
)
{
this.configManager = configManager;
this.auditManager = auditManager;
}
/**
* Updates the Kubernetes execution configuration.
*
* @param dynamicConfig the new execution configuration to set
* @param req the HTTP servlet request providing context for audit information
* @return a response indicating the success or failure of the update operation
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@ResourceFilters(ConfigResourceFilter.class)
public Response setExecutionConfig(
final KubernetesTaskRunnerDynamicConfig dynamicConfig,
@Context final HttpServletRequest req
)
{
final ConfigManager.SetResult setResult = configManager.set(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
dynamicConfig,
AuthorizationUtils.buildAuditInfo(req)
);
if (setResult.isOk()) {
log.info("Updating K8s execution configs: %s", dynamicConfig);
return Response.ok().build();
} else {
return Response.status(Response.Status.BAD_REQUEST).build();
}
}
/**
* Retrieves the history of changes to the Kubernetes execution configuration.
*
* @param interval the time interval for fetching historical data (optional)
* @param count the maximum number of historical entries to fetch (optional)
* @return a response containing a list of audit entries or an error message
*/
@GET
@Path("/history")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(ConfigResourceFilter.class)
public Response getExecutionConfigHistory(
@QueryParam("interval") final String interval,
@QueryParam("count") final Integer count
)
{
Interval theInterval = interval == null ? null : Intervals.of(interval);
if (theInterval == null && count != null) {
try {
List<AuditEntry> executionEntryList = auditManager.fetchAuditHistory(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
count
);
return Response.ok(executionEntryList).build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
}
List<AuditEntry> executionEntryList = auditManager.fetchAuditHistory(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
theInterval
);
return Response.ok(executionEntryList).build();
}
/**
* Retrieves the current execution configuration for tasks running in Kubernetes.
*
* @return a Response object containing the current execution configuration in JSON format.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(ConfigResourceFilter.class)
public Response getExecutionConfig()
{
if (dynamicConfigRef == null) {
dynamicConfigRef = configManager.watch(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class);
}
return Response.ok(dynamicConfigRef.get()).build();
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* Represents the configuration for task execution within a Kubernetes environment.
* This interface allows for dynamic configuration of task execution strategies based
* on specified behavior strategies.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultKubernetesTaskRunnerDynamicConfig.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "default", value = DefaultKubernetesTaskRunnerDynamicConfig.class)
})
public interface KubernetesTaskRunnerDynamicConfig
{
String CONFIG_KEY = "k8s.taskrunner.config";
PodTemplateSelectStrategy DEFAULT_STRATEGY = new TaskTypePodTemplateSelectStrategy();
/**
* Retrieves the execution behavior strategy associated with this configuration.
* @return the execution behavior strategy
*/
PodTemplateSelectStrategy getPodTemplateSelectStrategy();
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.Task;
import java.util.Map;
/**
* Defines a strategy for selecting the Pod template of tasks based on specific conditions.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TaskTypePodTemplateSelectStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "default", value = TaskTypePodTemplateSelectStrategy.class),
@JsonSubTypes.Type(name = "selectorBased", value = SelectorBasedPodTemplateSelectStrategy.class),
})
public interface PodTemplateSelectStrategy
{
/**
* Determines the appropriate Pod template for a task by evaluating its properties. This selection
* allows for customized resource allocation and management tailored to the task's specific requirements.
*
* @param task The task for which the Pod template is determined.
* @return The selected Pod template. If no matching template is found,
* the method falls back to a base template.
*/
PodTemplate getPodTemplateForTask(Task task, Map<String, PodTemplate> templates);
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Represents a condition-based selector that evaluates whether a given task meets specified criteria.
* The selector uses conditions defined on context tags and task fields to determine if a task matches.
*/
public class Selector
{
private final String selectionKey;
private final Map<String, Set<String>> cxtTagsConditions;
private final Set<String> taskTypeCondition;
private final Set<String> dataSourceCondition;
/**
* Creates a selector with specified conditions for context tags and task fields.
*
* @param selectionKey the identifier representing the outcome when a task matches the conditions
* @param cxtTagsConditions conditions on context tags
* @param taskTypeCondition conditions on task type
* @param dataSourceCondition conditions on task dataSource
*/
@JsonCreator
public Selector(
@JsonProperty("selectionKey") String selectionKey,
@JsonProperty("context.tags") Map<String, Set<String>> cxtTagsConditions,
@JsonProperty("type") Set<String> taskTypeCondition,
@JsonProperty("dataSource") Set<String> dataSourceCondition
)
{
this.selectionKey = selectionKey;
this.cxtTagsConditions = cxtTagsConditions;
this.taskTypeCondition = taskTypeCondition;
this.dataSourceCondition = dataSourceCondition;
}
/**
* Evaluates this selector against a given task.
*
* @param task the task to evaluate
* @return true if the task meets all the conditions specified by this selector, otherwise false
*/
public boolean evaluate(Task task)
{
boolean isMatch = true;
if (cxtTagsConditions != null) {
isMatch = cxtTagsConditions.entrySet().stream().allMatch(entry -> {
String tagKey = entry.getKey();
Set<String> tagValues = entry.getValue();
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
if (tags == null || tags.isEmpty()) {
return false;
}
Object tagValue = tags.get(tagKey);
return tagValue == null ? false : tagValues.contains((String) tagValue);
});
}
if (isMatch && taskTypeCondition != null) {
isMatch = taskTypeCondition.contains(task.getType());
}
if (isMatch && dataSourceCondition != null) {
isMatch = dataSourceCondition.contains(task.getDataSource());
}
return isMatch;
}
@JsonProperty
public String getSelectionKey()
{
return selectionKey;
}
@JsonProperty("context.tags")
public Map<String, Set<String>> getCxtTagsConditions()
{
return cxtTagsConditions;
}
@JsonProperty("type")
public Set<String> getTaskTypeCondition()
{
return taskTypeCondition;
}
@JsonProperty("dataSource")
public Set<String> getDataSourceCondition()
{
return dataSourceCondition;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Selector selector = (Selector) o;
return Objects.equals(selectionKey, selector.selectionKey) && Objects.equals(
cxtTagsConditions,
selector.cxtTagsConditions
) && Objects.equals(taskTypeCondition, selector.taskTypeCondition) && Objects.equals(
dataSourceCondition,
selector.dataSourceCondition
);
}
@Override
public int hashCode()
{
return Objects.hash(selectionKey, cxtTagsConditions, taskTypeCondition, dataSourceCondition);
}
@Override
public String toString()
{
return "Selector{" +
"selectionKey=" + selectionKey +
", context.tags=" + cxtTagsConditions +
", type=" + taskTypeCondition +
", dataSource=" + dataSourceCondition +
'}';
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.Task;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Implements {@link PodTemplateSelectStrategy} by dynamically evaluating a series of selectors.
* Each selector corresponds to a potential task template key.
*/
public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelectStrategy
{
@Nullable
private String defaultKey;
private List<Selector> selectors;
@JsonCreator
public SelectorBasedPodTemplateSelectStrategy(
@JsonProperty("selectors") List<Selector> selectors,
@JsonProperty("defaultKey") @Nullable String defaultKey
)
{
Preconditions.checkNotNull(selectors, "selectors");
this.selectors = selectors;
this.defaultKey = defaultKey;
}
/**
* Evaluates the provided task against the set selectors to determine its template.
*
* @param task the task to be checked
* @return the template if a selector matches, otherwise fallback to base template
*/
@Override
public PodTemplate getPodTemplateForTask(Task task, Map<String, PodTemplate> templates)
{
String templateKey = selectors.stream()
.filter(selector -> selector.evaluate(task))
.findFirst()
.map(Selector::getSelectionKey)
.orElse(defaultKey);
return templates.getOrDefault(templateKey, templates.get("base"));
}
@JsonProperty
public List<Selector> getSelectors()
{
return selectors;
}
@Nullable
@JsonProperty
public String getDefaultKey()
{
return defaultKey;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SelectorBasedPodTemplateSelectStrategy that = (SelectorBasedPodTemplateSelectStrategy) o;
return Objects.equals(defaultKey, that.defaultKey) && Objects.equals(selectors, that.selectors);
}
@Override
public int hashCode()
{
return Objects.hash(defaultKey, selectors);
}
@Override
public String toString()
{
return "SelectorBasedPodTemplateSelectStrategy{" +
"selectors=" + selectors +
", defaultKey=" + defaultKey +
'}';
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.Task;
import java.util.Map;
/**
* This strategy defines how task template is selected based on their type for execution purposes.
*
* This implementation selects pod template by looking at the type of the task,
* making it a straightforward, type-based template selection strategy.
*/
public class TaskTypePodTemplateSelectStrategy implements PodTemplateSelectStrategy
{
@JsonCreator
public TaskTypePodTemplateSelectStrategy()
{
}
@Override
public PodTemplate getPodTemplateForTask(Task task, Map<String, PodTemplate> templates)
{
return templates.getOrDefault(task.getType(), templates.get("base"));
}
@Override
public boolean equals(Object o)
{
return o instanceof TaskTypePodTemplateSelectStrategy;
}
@Override
public int hashCode()
{
return 1; // Any constant will work here
}
@Override
public String toString()
{
return "TaskTypePodTemplateSelectStrategy{" +
'}';
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord.taskadapter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.EnvVar;
@ -46,6 +47,8 @@ import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesOverlordUtils;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogs;
@ -83,7 +86,6 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.";
private final KubernetesTaskRunnerConfig taskRunnerConfig;
@ -92,6 +94,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private final ObjectMapper mapper;
private final HashMap<String, PodTemplate> templates;
private final TaskLogs taskLogs;
private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
public PodTemplateTaskAdapter(
KubernetesTaskRunnerConfig taskRunnerConfig,
@ -99,7 +102,8 @@ public class PodTemplateTaskAdapter implements TaskAdapter
DruidNode node,
ObjectMapper mapper,
Properties properties,
TaskLogs taskLogs
TaskLogs taskLogs,
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
)
{
this.taskRunnerConfig = taskRunnerConfig;
@ -108,6 +112,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
this.mapper = mapper;
this.templates = initializePodTemplates(properties);
this.taskLogs = taskLogs;
this.dynamicConfigRef = dynamicConfigRef;
}
/**
@ -126,7 +131,16 @@ public class PodTemplateTaskAdapter implements TaskAdapter
@Override
public Job fromTask(Task task) throws IOException
{
PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
PodTemplateSelectStrategy podTemplateSelectStrategy;
KubernetesTaskRunnerDynamicConfig dynamicConfig = dynamicConfigRef.get();
if (dynamicConfig == null || dynamicConfig.getPodTemplateSelectStrategy() == null) {
podTemplateSelectStrategy = KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY;
} else {
podTemplateSelectStrategy = dynamicConfig.getPodTemplateSelectStrategy();
}
PodTemplate podTemplate = podTemplateSelectStrategy.getPodTemplateForTask(task, templates);
if (podTemplate == null) {
throw new ISE("Pod template spec not found for task type [%s]", task.getType());
}
@ -152,7 +166,9 @@ public class PodTemplateTaskAdapter implements TaskAdapter
.endTemplate()
.withActiveDeadlineSeconds(taskRunnerConfig.getTaskTimeout().toStandardDuration().getStandardSeconds())
.withBackoffLimit(0) // druid does not support an external system retrying failed tasks
.withTtlSecondsAfterFinished((int) taskRunnerConfig.getTaskCleanupDelay().toStandardDuration().getStandardSeconds())
.withTtlSecondsAfterFinished((int) taskRunnerConfig.getTaskCleanupDelay()
.toStandardDuration()
.getStandardSeconds())
.endSpec()
.build();
}
@ -320,12 +336,12 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private Map<String, String> getJobAnnotations(KubernetesTaskRunnerConfig config, Task task)
{
return ImmutableMap.<String, String>builder()
.putAll(config.getAnnotations())
.put(DruidK8sConstants.TASK_ID, task.getId())
.put(DruidK8sConstants.TASK_TYPE, task.getType())
.put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
.put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
.build();
.putAll(config.getAnnotations())
.put(DruidK8sConstants.TASK_ID, task.getId())
.put(DruidK8sConstants.TASK_TYPE, task.getType())
.put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
.put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
.build();
}
private String getDruidLabel(String baseLabel)

View File

@ -19,10 +19,15 @@
package org.apache.druid.k8s.overlord;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManagerConfig;
import org.apache.druid.guice.ConfigModule;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.annotations.EscalatedGlobal;
@ -33,6 +38,8 @@ import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.DruidNode;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
@ -55,6 +62,14 @@ public class KubernetesOverlordModuleTest
private RemoteTaskRunnerFactory remoteTaskRunnerFactory;
@Mock
private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
@Mock
private ConfigManagerConfig configManagerConfig;
@Mock
private MetadataStorageTablesConfig metadataStorageTablesConfig;
@Mock
private AuditManager auditManager;
@Mock
private MetadataStorageConnector metadataStorageConnector;
private Injector injector;
@Test
@ -111,6 +126,16 @@ public class KubernetesOverlordModuleTest
if (isWorkerTypeHttpRemote) {
binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory);
}
binder.bind(
new TypeLiteral<Supplier<ConfigManagerConfig>>()
{
}).toInstance(Suppliers.ofInstance(configManagerConfig));
binder.bind(
new TypeLiteral<Supplier<MetadataStorageTablesConfig>>()
{
}).toInstance(Suppliers.ofInstance(metadataStorageTablesConfig));
binder.bind(AuditManager.class).toInstance(auditManager);
binder.bind(MetadataStorageConnector.class).toInstance(metadataStorageConnector);
},
new ConfigModule(),
new KubernetesOverlordModule()

View File

@ -20,12 +20,14 @@
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
@ -53,6 +55,7 @@ public class KubernetesTaskRunnerFactoryTest
private DruidKubernetesClient druidKubernetesClient;
@Mock private ServiceEmitter emitter;
@Mock private Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
@Before
public void setup()
@ -90,7 +93,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
properties,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
KubernetesTaskRunner expectedRunner = factory.build();
@ -112,7 +116,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
properties,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@ -139,7 +144,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
properties,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@ -164,7 +170,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@ -194,7 +201,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
Assert.assertThrows(
@ -225,7 +233,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@ -250,7 +259,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@ -278,7 +288,8 @@ public class KubernetesTaskRunnerFactoryTest
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
public class DefaultKubernetesTaskRunnerDynamicConfigTest
{
@Test
public void getPodTemplateSelectStrategyTest()
{
PodTemplateSelectStrategy strategy = new TaskTypePodTemplateSelectStrategy();
DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy);
Assert.assertEquals(strategy, config.getPodTemplateSelectStrategy());
}
@Test
public void testSerde() throws Exception
{
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
PodTemplateSelectStrategy strategy = new TaskTypePodTemplateSelectStrategy();
DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy);
DefaultKubernetesTaskRunnerDynamicConfig config2 = objectMapper.readValue(
objectMapper.writeValueAsBytes(config),
DefaultKubernetesTaskRunnerDynamicConfig.class
);
Assert.assertEquals(config, config2);
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizationUtils;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class KubernetesTaskExecutionConfigResourceTest
{
private JacksonConfigManager configManager;
private AuditManager auditManager;
private HttpServletRequest req;
private KubernetesTaskRunnerDynamicConfig dynamicConfig;
@Before
public void setUp()
{
configManager = EasyMock.createMock(JacksonConfigManager.class);
auditManager = EasyMock.createMock(AuditManager.class);
req = EasyMock.createMock(HttpServletRequest.class);
dynamicConfig = EasyMock.createMock(KubernetesTaskRunnerDynamicConfig.class);
}
@Test
public void setExecutionConfigSuccessfulUpdate()
{
KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource(
configManager,
auditManager
);
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
EasyMock.replay(req);
EasyMock.expect(configManager.set(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
dynamicConfig,
AuthorizationUtils.buildAuditInfo(req)
)).andReturn(ConfigManager.SetResult.ok());
EasyMock.replay(configManager, auditManager, dynamicConfig);
Response result = testedResource.setExecutionConfig(dynamicConfig, req);
assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
}
@Test
public void setExecutionConfigFailedUpdate()
{
KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource(
configManager,
auditManager
);
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
EasyMock.replay(req);
EasyMock.expect(configManager.set(
KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
dynamicConfig,
AuthorizationUtils.buildAuditInfo(req)
)).andReturn(ConfigManager.SetResult.failure(new RuntimeException()));
EasyMock.replay(configManager, auditManager, dynamicConfig);
Response result = testedResource.setExecutionConfig(dynamicConfig, req);
assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), result.getStatus());
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
public class KubernetesTaskRunnerDynamicConfigTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testSerde() throws JsonProcessingException
{
String json = "{\n"
+ " \"type\": \"default\",\n"
+ " \"podTemplateSelectStrategy\": {\n"
+ " \"type\": \"default\"\n"
+ " }\n"
+ "}";
KubernetesTaskRunnerDynamicConfig deserialized = jsonMapper.readValue(
json,
KubernetesTaskRunnerDynamicConfig.class
);
PodTemplateSelectStrategy selectStrategy = deserialized.getPodTemplateSelectStrategy();
Assert.assertTrue(selectStrategy instanceof TaskTypePodTemplateSelectStrategy);
json = "{\n"
+ " \"type\": \"default\",\n"
+ " \"podTemplateSelectStrategy\":\n"
+ " {\n"
+ " \"type\": \"selectorBased\",\n"
+ " \"selectors\": [\n"
+ " {\n"
+ " \"selectionKey\": \"low-throughput\",\n"
+ " \"context.tags\":\n"
+ " {\n"
+ " \"billingCategory\": [\"streaming_ingestion\"]\n"
+ " },\n"
+ " \"dataSource\": [\"wikipedia\"]\n"
+ " },\n"
+ " {\n"
+ " \"selectionKey\": \"medium-throughput\",\n"
+ " \"type\": [\"index_kafka\"]\n"
+ " }\n"
+ " ],\n"
+ " \"defaultKey\": \"base\"\n"
+ " }\n"
+ "}";
deserialized = jsonMapper.readValue(json, KubernetesTaskRunnerDynamicConfig.class);
selectStrategy = deserialized.getPodTemplateSelectStrategy();
Assert.assertTrue(selectStrategy instanceof SelectorBasedPodTemplateSelectStrategy);
Assert.assertEquals(2, ((SelectorBasedPodTemplateSelectStrategy) selectStrategy).getSelectors().size());
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodTemplate;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SelectorBasedPodTemplateSelectStrategyTest
{
private Map<String, PodTemplate> templates;
@Before
public void setup()
{
templates = ImmutableMap.of(
"mock",
new PodTemplate(null, null, new ObjectMeta()
{
@Override
public String getName()
{
return "mock";
}
}, null),
"no_match",
new PodTemplate(null, null, new ObjectMeta()
{
@Override
public String getName()
{
return "no_match";
}
}, null),
"match",
new PodTemplate(null, null, new ObjectMeta()
{
@Override
public String getName()
{
return "match";
}
}, null),
"base",
new PodTemplate(null, "base", new ObjectMeta()
{
@Override
public String getName()
{
return "base";
}
}, null)
);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerExceptionWhenSelectorsAreNull()
{
new SelectorBasedPodTemplateSelectStrategy(null, null);
}
@Test
public void testGetPodTemplate_ForTask_emptySelectorsFallbackToBaseTemplate()
{
List<Selector> emptySelectors = Collections.emptyList();
SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(emptySelectors, null);
Task task = NoopTask.create();
Assert.assertEquals("base", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
}
@Test
public void testGetPodTemplate_ForTask_noMatchSelectorsFallbackToBaseTemplateIfNullDefaultKey()
{
Selector noMatchSelector = new MockSelector(false, "mock");
List<Selector> selectors = Collections.singletonList(noMatchSelector);
SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, null);
Task task = NoopTask.create();
Assert.assertEquals("base", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
}
@Test
public void testGetPodTemplate_ForTask_noMatchSelectorsFallbackToDefaultKeyTemplate()
{
Selector noMatchSelector = new MockSelector(false, "mock");
List<Selector> selectors = Collections.singletonList(noMatchSelector);
SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, "match");
Task task = NoopTask.create();
Assert.assertEquals("match", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
}
@Test
public void testGetPodTemplate_ForTask_withMatchSelectors()
{
Selector noMatchSelector = new MockSelector(
false,
"no_match"
);
Selector matchSelector = new MockSelector(true, "match");
List<Selector> selectors = Lists.newArrayList(
noMatchSelector,
matchSelector
);
SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, null);
Task task = NoopTask.create();
Assert.assertEquals("match", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
}
@Test
public void testSerde() throws Exception
{
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
Map<String, Set<String>> cxtTagsConditions = new HashMap<>();
cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
Selector selector = new Selector(
"TestSelector",
cxtTagsConditions,
Sets.newHashSet(NoopTask.TYPE),
Sets.newHashSet("my_table")
);
SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(
Collections.singletonList(selector), "default");
SelectorBasedPodTemplateSelectStrategy strategy2 = objectMapper.readValue(
objectMapper.writeValueAsBytes(strategy),
SelectorBasedPodTemplateSelectStrategy.class
);
Assert.assertEquals(strategy, strategy2);
}
static class MockSelector extends Selector
{
private final boolean matches;
MockSelector(boolean matches, String name)
{
super(name, null, null, null);
this.matches = matches;
}
@Override
public boolean evaluate(final Task task)
{
return matches;
}
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.execution;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class SelectorTest
{
@Test
public void shouldReturnTrueWhenAllTagsAndTasksMatch()
{
String dataSource = "my_table";
Map<String, Set<String>> cxtTagsConditions = new HashMap<>();
cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
Task task = NoopTask.forDatasource(dataSource);
task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
Selector selector = new Selector(
"TestSelector",
cxtTagsConditions,
null,
Sets.newHashSet(dataSource)
);
Assert.assertTrue(selector.evaluate(task));
}
@Test
public void shouldReturnFalseWhenTagsDoNotMatch()
{
String dataSource = "my_table";
Map<String, Set<String>> cxtTagsConditions = new HashMap<>();
cxtTagsConditions.put("nonexistentTag", Sets.newHashSet("tag1value"));
Task task = NoopTask.forDatasource(dataSource);
Selector selector = new Selector(
"TestSelector",
cxtTagsConditions,
null,
Sets.newHashSet(dataSource)
);
Assert.assertFalse(selector.evaluate(task));
}
@Test
public void shouldReturnFalseWhenSomeTagsDoNotMatch()
{
String dataSource = "my_table";
Map<String, Set<String>> cxtTagsConditions = new HashMap<>();
cxtTagsConditions.put("nonexistentTag", Sets.newHashSet("nonexistentTagValue"));
cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
Task task = NoopTask.forDatasource(dataSource);
task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
Selector selector = new Selector(
"TestSelector",
cxtTagsConditions,
null,
Sets.newHashSet(dataSource)
);
Assert.assertFalse(selector.evaluate(task));
}
@Test
public void shouldReturnFalseWhenTaskFieldsDoNotMatch()
{
Map<String, Set<String>> cxtTagsConditions = new HashMap<>();
cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
Task task = NoopTask.forDatasource("another_table");
task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
Selector selector = new Selector(
"TestSelector",
cxtTagsConditions,
null,
Sets.newHashSet("my_table")
);
Assert.assertFalse(selector.evaluate(task));
}
@Test
public void shouldReturnFalseWhenSomeTaskFieldsDoNotMatch()
{
Map<String, Set<String>> cxtTagsConditions = new HashMap<>();
cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
Task task = NoopTask.forDatasource("another_table");
task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
Selector selector = new Selector(
"TestSelector",
cxtTagsConditions,
Sets.newHashSet(NoopTask.TYPE),
Sets.newHashSet("my_table")
);
Assert.assertFalse(selector.evaluate(task));
}
@Test
public void shouldReturnTrueWhenNoConditionsSpecified()
{
Task task = NoopTask.forDatasource("my_table");
task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
Selector selector = new Selector(
"TestSelector",
null,
null,
null
);
Assert.assertTrue(selector.evaluate(task));
}
@Test
public void testSerde() throws Exception
{
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
Map<String, Set<String>> cxtTagsConditions = new HashMap<>();
cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
Selector selector = new Selector(
"TestSelector",
cxtTagsConditions,
Sets.newHashSet(NoopTask.TYPE),
Sets.newHashSet("my_table")
);
Selector selector2 = objectMapper.readValue(
objectMapper.writeValueAsBytes(selector),
Selector.class
);
Assert.assertEquals(selector, selector2);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord.taskadapter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.PodTemplateBuilder;
@ -40,16 +41,20 @@ import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.execution.Selector;
import org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.EasyMock;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -71,7 +76,8 @@ public class PodTemplateTaskAdapterTest
private TaskConfig taskConfig;
private DruidNode node;
private ObjectMapper mapper;
@Mock private TaskLogs taskLogs;
private TaskLogs taskLogs;
private Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
@BeforeEach
public void setup()
@ -89,6 +95,9 @@ public class PodTemplateTaskAdapterTest
);
mapper = new TestUtils().getTestObjectMapper();
podTemplateSpec = K8sTestUtils.fileToResource("basePodTemplate.yaml", PodTemplate.class);
taskLogs = EasyMock.createMock(TaskLogs.class);
dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY);
}
@Test
@ -103,7 +112,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
new Properties(),
taskLogs
taskLogs,
dynamicConfigRef
));
Assert.assertEquals(exception.getMessage(), "Pod template task adapter requires a base pod template to be specified under druid.indexer.runner.k8s.podTemplate.base");
}
@ -125,11 +135,11 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
));
Assert.assertTrue(exception.getMessage().contains("Failed to load pod template file for"));
}
@Test
@ -147,7 +157,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@ -180,7 +191,8 @@ public class PodTemplateTaskAdapterTest
),
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@ -207,7 +219,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
));
}
@ -227,7 +240,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@ -253,7 +267,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Task task = new NoopTask(
@ -286,7 +301,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
@ -309,7 +325,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Job job = new JobBuilder()
.editSpec().editTemplate().editMetadata()
@ -333,7 +350,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Job job = new JobBuilder()
.editSpec().editTemplate().editMetadata()
@ -357,7 +375,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Job job = new JobBuilder()
.editSpec().editTemplate().editMetadata()
@ -383,7 +402,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Job baseJob = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
@ -415,7 +435,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
@ -446,7 +467,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
mockTestLogs
mockTestLogs,
dynamicConfigRef
);
Job job = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
@ -470,7 +492,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Task task = new NoopTask(
@ -504,7 +527,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Task task = EasyMock.mock(Task.class);
@ -552,7 +576,8 @@ public class PodTemplateTaskAdapterTest
node,
mapper,
props,
taskLogs
taskLogs,
dynamicConfigRef
);
Task kafkaTask = new NoopTask("id", "id", "datasource", 0, 0, null) {
@ -571,6 +596,51 @@ public class PodTemplateTaskAdapterTest
Assert.assertEquals(0, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
}
@Test
public void test_fromTask_matchPodTemplateBasedOnStrategy() throws IOException
{
String dataSource = "my_table";
Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
Path lowThroughputTemplatePath = Files.createFile(tempDir.resolve("low-throughput.yaml"));
PodTemplate lowThroughputPodTemplate = new PodTemplateBuilder(podTemplateSpec)
.editTemplate()
.editSpec()
.setNewVolumeLike(0, new VolumeBuilder().withName("volume").build())
.endVolume()
.endSpec()
.endTemplate()
.build();
mapper.writeValue(lowThroughputTemplatePath.toFile(), lowThroughputPodTemplate);
Properties props = new Properties();
props.setProperty("druid.indexer.runner.k8s.podTemplate.base", baseTemplatePath.toString());
props.setProperty("druid.indexer.runner.k8s.podTemplate.lowThroughput", lowThroughputTemplatePath.toString());
dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(new SelectorBasedPodTemplateSelectStrategy(
Collections.singletonList(
new Selector("lowThrougput", null, null, Sets.newSet(dataSource)
)), null));
PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
taskRunnerConfig,
taskConfig,
node,
mapper,
props,
taskLogs,
dynamicConfigRef
);
Task taskWithMatchedDatasource = new NoopTask("id", "id", dataSource, 0, 0, null);
Task noopTask = new NoopTask("id", "id", "datasource", 0, 0, null);
Job actual = adapter.fromTask(taskWithMatchedDatasource);
Assert.assertEquals(1, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
actual = adapter.fromTask(noopTask);
Assert.assertEquals(0, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
}
private void assertJobSpecsEqual(Job actual, Job expected) throws IOException
{
Map<String, String> actualAnnotations = actual.getSpec().getTemplate().getMetadata().getAnnotations();