Ability to send task types to k8s or worker task runner (#15196)

* Ability to send task types to k8s or worker task runner

* add more tests

* use runnerStrategy to determine task runner

* minor refine

* refine runner strategy config

* move workerType config to upper level

* validate config when application start
This commit is contained in:
YongGang 2023-10-25 09:55:56 -07:00 committed by GitHub
parent 207398a47d
commit 7a25ee4fd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 587 additions and 55 deletions

View File

@ -284,6 +284,9 @@ To do this, set the following property.
|Property| Possible Values |Description|Default|required|
|--------|-----------------|-----------|-------|--------|
|`druid.indexer.runner.k8sAndWorker.workerTaskRunnerType`|`String`|Determines whether the `httpRemote` or the `remote` task runner should be used in addition to the Kubernetes task runner.|`httpRemote`|No|
|`druid.indexer.runner.k8sAndWorker.sendAllTasksToWorkerTaskRunner`|`boolean`| Whether to send all the tasks to the worker task runner. If this is set to false all tasks will be sent to Kubernetes|`false`|No|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.type`| `String` (e.g., `k8s`, `worker`, `taskType`)| Defines the strategy for task runner selection. |`k8s`|No|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType`| `String` (e.g., `httpRemote`, `remote`)| Specifies the variant of the worker task runner to be utilized.|`httpRemote`|No|
| **For `taskType` runner strategy:**|||||
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.default`| `String` (e.g., `k8s`, `worker`) | Specifies the default runner to use if no overrides apply. This setting ensures there is always a fallback runner available.|None|No|
|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.overrides`| `JsonObject`(e.g., `{"index_kafka": "worker"}`)| Defines task-specific overrides for runner types. Each entry sets a task type to a specific runner, allowing fine control. |`{}`|No|

View File

@ -38,6 +38,7 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.TaskLogStreamer;
import javax.annotation.Nullable;
@ -57,17 +58,17 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas
{
private final KubernetesTaskRunner kubernetesTaskRunner;
private final WorkerTaskRunner workerTaskRunner;
private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig;
private final RunnerStrategy runnerStrategy;
public KubernetesAndWorkerTaskRunner(
KubernetesTaskRunner kubernetesTaskRunner,
WorkerTaskRunner workerTaskRunner,
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunner = kubernetesTaskRunner;
this.workerTaskRunner = workerTaskRunner;
this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig;
this.runnerStrategy = runnerStrategy;
}
@Override
@ -101,7 +102,8 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
if (kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
RunnerStrategy.RunnerType runnerType = runnerStrategy.getRunnerTypeForTask(task);
if (RunnerStrategy.RunnerType.WORKER_RUNNER_TYPE.equals(runnerType)) {
return workerTaskRunner.run(task);
} else {
return kubernetesTaskRunner.run(task);

View File

@ -26,56 +26,49 @@ import org.apache.commons.lang3.ObjectUtils;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import javax.validation.constraints.NotNull;
import javax.annotation.Nullable;
public class KubernetesAndWorkerTaskRunnerConfig
{
private static final String DEFAULT_WORKER_TASK_RUNNER_TYPE = HttpRemoteTaskRunnerFactory.TYPE_NAME;
/**
* Select which worker task runner to use in addition to the Kubernetes runner, options are httpRemote or remote.
*/
@JsonProperty
@NotNull
private final String workerTaskRunnerType;
private final String RUNNER_STRATEGY_TYPE = "runnerStrategy.type";
private final String RUNNER_STRATEGY_WORKER_TYPE = "runnerStrategy.workerType";
/**
* Whether or not to send tasks to the worker task runner instead of the Kubernetes runner.
* Select which runner type a task would run on, options are k8s or worker.
*/
@JsonProperty
@NotNull
private final boolean sendAllTasksToWorkerTaskRunner;
@JsonProperty(RUNNER_STRATEGY_TYPE)
private String runnerStrategy;
@JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
private String workerType;
@JsonCreator
public KubernetesAndWorkerTaskRunnerConfig(
@JsonProperty("workerTaskRunnerType") String workerTaskRunnerType,
@JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean sendAllTasksToWorkerTaskRunner
@JsonProperty(RUNNER_STRATEGY_TYPE) @Nullable String runnerStrategy,
@JsonProperty(RUNNER_STRATEGY_WORKER_TYPE) @Nullable String workerType
)
{
this.workerTaskRunnerType = ObjectUtils.defaultIfNull(
workerTaskRunnerType,
DEFAULT_WORKER_TASK_RUNNER_TYPE
);
this.runnerStrategy = ObjectUtils.defaultIfNull(runnerStrategy, KubernetesTaskRunnerFactory.TYPE_NAME);
this.workerType = ObjectUtils.defaultIfNull(workerType, HttpRemoteTaskRunnerFactory.TYPE_NAME);
Preconditions.checkArgument(
this.workerTaskRunnerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
this.workerTaskRunnerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
"workerTaskRunnerType must be set to one of (%s, %s)",
this.workerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
this.workerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
"workerType must be set to one of (%s, %s)",
HttpRemoteTaskRunnerFactory.TYPE_NAME,
RemoteTaskRunnerFactory.TYPE_NAME
);
this.sendAllTasksToWorkerTaskRunner = ObjectUtils.defaultIfNull(
sendAllTasksToWorkerTaskRunner,
false
);
}
public String getWorkerTaskRunnerType()
@JsonProperty(RUNNER_STRATEGY_TYPE)
public String getRunnerStrategy()
{
return workerTaskRunnerType;
return runnerStrategy;
}
public boolean isSendAllTasksToWorkerTaskRunner()
@JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
public String getWorkerType()
{
return sendAllTasksToWorkerTaskRunner;
return workerType;
}
}

View File

@ -22,7 +22,9 @@ package org.apache.druid.k8s.overlord;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<KubernetesAndWorkerTaskRunner>
@ -33,6 +35,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K
private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
private final KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig;
private final RunnerStrategy runnerStrategy;
private KubernetesAndWorkerTaskRunner runner;
@ -41,13 +44,15 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K
KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
RemoteTaskRunnerFactory remoteTaskRunnerFactory,
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig,
RunnerStrategy runnerStrategy
)
{
this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
this.kubernetesAndWorkerTaskRunnerConfig = kubernetesAndWorkerTaskRunnerConfig;
this.runnerStrategy = runnerStrategy;
}
@Override
@ -55,13 +60,19 @@ public class KubernetesAndWorkerTaskRunnerFactory implements TaskRunnerFactory<K
{
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunnerFactory.build(),
HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(kubernetesAndWorkerTaskRunnerConfig.getWorkerTaskRunnerType()) ?
httpRemoteTaskRunnerFactory.build() : remoteTaskRunnerFactory.build(),
kubernetesAndWorkerTaskRunnerConfig
getWorkerTaskRunner(),
runnerStrategy
);
return runner;
}
private WorkerTaskRunner getWorkerTaskRunner()
{
String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType();
return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ?
httpRemoteTaskRunnerFactory.build() : remoteTaskRunnerFactory.build();
}
@Override
public KubernetesAndWorkerTaskRunner get()
{

View File

@ -20,7 +20,9 @@
package org.apache.druid.k8s.overlord;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import io.fabric8.kubernetes.client.Config;
@ -29,6 +31,7 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.LoadScope;
@ -37,27 +40,35 @@ import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;
import java.util.Properties;
@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
public class KubernetesOverlordModule implements DruidModule
{
private static final Logger log = new Logger(KubernetesOverlordModule.class);
private static final String K8SANDWORKER_PROPERTIES_PREFIX = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+ ".k8sAndWorker";
private static final String RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING = K8SANDWORKER_PROPERTIES_PREFIX
+ ".runnerStrategy.%s";
@Override
public void configure(Binder binder)
{
// druid.indexer.runner.type=k8s
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8sAndWorker", KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
PolyBind.createChoice(
binder,
@ -78,6 +89,9 @@ public class KubernetesOverlordModule implements DruidModule
.in(LazySingleton.class);
binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RunnerStrategy.class)
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
configureTaskLogs(binder);
}
@ -116,6 +130,45 @@ public class KubernetesOverlordModule implements DruidModule
return client;
}
private static class RunnerStrategyProvider implements Provider<RunnerStrategy>
{
private KubernetesAndWorkerTaskRunnerConfig runnerConfig;
private Properties props;
private JsonConfigurator configurator;
@Inject
public void inject(
KubernetesAndWorkerTaskRunnerConfig runnerConfig,
Properties props,
JsonConfigurator configurator
)
{
this.runnerConfig = runnerConfig;
this.props = props;
this.configurator = configurator;
}
@Override
public RunnerStrategy get()
{
String runnerStrategy = runnerConfig.getRunnerStrategy();
final String runnerStrategyPropertyBase = StringUtils.format(
RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING,
runnerStrategy
);
final JsonConfigProvider<RunnerStrategy> provider = JsonConfigProvider.of(
runnerStrategyPropertyBase,
RunnerStrategy.class
);
props.put(runnerStrategyPropertyBase + ".type", runnerStrategy);
provider.inject(props, configurator);
return provider.get();
}
}
private void configureTaskLogs(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.runnerstrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.indexing.common.task.Task;
/**
* Implementation of {@link RunnerStrategy} that always selects the Kubernetes runner type for tasks.
*
* <p>This strategy is specific for tasks that are intended to be executed in a Kubernetes environment.
* Regardless of task specifics, this strategy always returns {@link RunnerType#KUBERNETES_RUNNER_TYPE}.
*/
public class KubernetesRunnerStrategy implements RunnerStrategy
{
@JsonCreator
public KubernetesRunnerStrategy()
{
}
@Override
public RunnerType getRunnerTypeForTask(Task task)
{
return RunnerType.KUBERNETES_RUNNER_TYPE;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.runnerstrategy;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
/**
* Strategy interface for selecting the appropriate runner type based on the task spec or specific context conditions.
*
* <p>This interface is part of a strategy pattern and is implemented by different classes that handle
* the logic of selecting a runner type based on various criteria. Each task submitted to the system
* will pass through the strategy implementation to determine the correct runner for execution.
*
* <p>The strategy uses {@link RunnerType} as a standardized way of referring to and managing different types of runners.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = KubernetesRunnerStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "k8s", value = KubernetesRunnerStrategy.class),
@JsonSubTypes.Type(name = "worker", value = WorkerRunnerStrategy.class),
@JsonSubTypes.Type(name = "taskType", value = TaskTypeRunnerStrategy.class)
})
public interface RunnerStrategy
{
String WORKER_NAME = "worker";
/**
* Enumerates the available runner types, each associated with a specific method of task execution.
* These runner types are used by the strategies to make decisions and by the system to route tasks appropriately.
*/
enum RunnerType
{
KUBERNETES_RUNNER_TYPE(KubernetesTaskRunnerFactory.TYPE_NAME),
WORKER_RUNNER_TYPE(WORKER_NAME);
private final String type;
RunnerType(String type)
{
this.type = type;
}
public String getType()
{
return type;
}
}
/**
* Analyzes the task and determines the appropriate runner type for executing it.
*
* @param task The task that needs to be executed.
* @return The runner type deemed most suitable for executing the task.
*/
RunnerType getRunnerTypeForTask(Task task);
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.runnerstrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.task.Task;
import javax.annotation.Nullable;
import java.util.Map;
/**
* Implementation of {@link RunnerStrategy} that allows dynamic selection of runner type based on task type.
*
* <p>This strategy checks each task's type against a set of overrides to determine the appropriate runner type.
* If no override is specified for a task's type, it uses a default runner.
*
* <p>Runner types are determined based on configurations provided at construction, including default runner
* type and specific overrides per task type. This strategy is designed for environments where tasks may require
* different execution environments (e.g., Kubernetes or worker nodes).
*/
public class TaskTypeRunnerStrategy implements RunnerStrategy
{
@Nullable
private final Map<String, String> overrides;
private final RunnerStrategy kubernetesRunnerStrategy = new KubernetesRunnerStrategy();
private WorkerRunnerStrategy workerRunnerStrategy;
private final RunnerStrategy defaultRunnerStrategy;
private final String defaultRunner;
@JsonCreator
public TaskTypeRunnerStrategy(
@JsonProperty("default") String defaultRunner,
@JsonProperty("overrides") @Nullable Map<String, String> overrides
)
{
Preconditions.checkNotNull(defaultRunner);
workerRunnerStrategy = new WorkerRunnerStrategy();
defaultRunnerStrategy = RunnerType.WORKER_RUNNER_TYPE.getType().equals(defaultRunner) ?
workerRunnerStrategy : kubernetesRunnerStrategy;
validate(overrides);
this.defaultRunner = defaultRunner;
this.overrides = overrides;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, String> getOverrides()
{
return overrides;
}
@JsonProperty
public String getDefault()
{
return defaultRunner;
}
@Override
public RunnerType getRunnerTypeForTask(Task task)
{
String runnerType = null;
if (overrides != null) {
runnerType = overrides.get(task.getType());
}
RunnerStrategy runnerStrategy = getRunnerSelectStrategy(runnerType);
return runnerStrategy.getRunnerTypeForTask(task);
}
private RunnerStrategy getRunnerSelectStrategy(String runnerType)
{
if (runnerType == null) {
return defaultRunnerStrategy;
}
if (WORKER_NAME.equals(runnerType)) {
return workerRunnerStrategy;
} else {
return kubernetesRunnerStrategy;
}
}
private void validate(Map<String, String> overrides)
{
if (overrides == null) {
return;
}
boolean hasValidRunnerType =
overrides.values().stream().allMatch(v -> RunnerType.WORKER_RUNNER_TYPE.getType().equals(v)
|| RunnerType.KUBERNETES_RUNNER_TYPE.getType().equals(v));
Preconditions.checkArgument(
hasValidRunnerType,
"Invalid config in 'overrides'. Each runner type must be either '%s' or '%s'.",
RunnerType.WORKER_RUNNER_TYPE.getType(),
RunnerType.KUBERNETES_RUNNER_TYPE.getType()
);
}
@Override
public String toString()
{
return "TaskTypeRunnerStrategy{" +
"default=" + defaultRunner +
", overrides=" + overrides +
'}';
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.runnerstrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.indexing.common.task.Task;
/**
* Implementation of {@link RunnerStrategy} that always selects the Worker runner type for tasks.
*
* <p>This strategy is specific for tasks that are intended to be executed in a Worker environment.
* Regardless of task specifics, this strategy always returns {@link RunnerType#WORKER_RUNNER_TYPE}.
*/
public class WorkerRunnerStrategy implements RunnerStrategy
{
@JsonCreator
public WorkerRunnerStrategy()
{
}
@Override
public RunnerType getRunnerTypeForTask(Task task)
{
return RunnerType.WORKER_RUNNER_TYPE;
}
}

View File

@ -20,8 +20,6 @@
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
@ -39,9 +37,8 @@ public class KubernetesAndWorkerTaskRunnerConfigTest
KubernetesAndWorkerTaskRunnerConfig.class
);
Assert.assertEquals(RemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType());
Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
Assert.assertEquals("worker", config.getRunnerStrategy());
Assert.assertEquals("remote", config.getWorkerType());
}
@Test
@ -49,7 +46,6 @@ public class KubernetesAndWorkerTaskRunnerConfigTest
{
KubernetesAndWorkerTaskRunnerConfig config = new KubernetesAndWorkerTaskRunnerConfig(null, null);
Assert.assertEquals(HttpRemoteTaskRunnerFactory.TYPE_NAME, config.getWorkerTaskRunnerType());
Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, config.getRunnerStrategy());
}
}

View File

@ -22,6 +22,8 @@ package org.apache.druid.k8s.overlord;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@ -45,7 +47,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig(null, null)
new KubernetesAndWorkerTaskRunnerConfig(null, null),
new WorkerRunnerStrategy()
);
EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
@ -63,7 +66,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig("remote", null)
new KubernetesAndWorkerTaskRunnerConfig(null, "remote"),
new WorkerRunnerStrategy()
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
@ -81,7 +85,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport
kubernetesTaskRunnerFactory,
httpRemoteTaskRunnerFactory,
remoteTaskRunnerFactory,
new KubernetesAndWorkerTaskRunnerConfig("noop", null)
new KubernetesAndWorkerTaskRunnerConfig(null, "noop"),
new KubernetesRunnerStrategy()
);
EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);

View File

@ -31,6 +31,9 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
import org.apache.druid.k8s.overlord.runnerstrategy.TaskTypeRunnerStrategy;
import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@ -67,7 +70,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
runner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, null)
new KubernetesRunnerStrategy()
);
}
@ -77,7 +80,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, false)
new KubernetesRunnerStrategy()
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(kubernetesTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
@ -93,7 +96,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
new KubernetesAndWorkerTaskRunnerConfig(null, true)
new WorkerRunnerStrategy()
);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(workerTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
@ -103,6 +106,33 @@ public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
verifyAll();
}
@Test
public void test_runOnKubernetesOrWorkerBasedOnStrategy() throws ExecutionException, InterruptedException
{
TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", ImmutableMap.of("index_kafka", "worker"));
KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
kubernetesTaskRunner,
workerTaskRunner,
runnerStrategy
);
Task taskMock = EasyMock.createMock(Task.class);
TaskStatus taskStatus = TaskStatus.success(ID);
EasyMock.expect(taskMock.getId()).andReturn(ID).anyTimes();
EasyMock.expect(taskMock.getType()).andReturn("index_kafka").once();
EasyMock.expect(workerTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
EasyMock.replay(taskMock, workerTaskRunner);
Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get());
EasyMock.verify(taskMock, workerTaskRunner);
EasyMock.reset(taskMock, workerTaskRunner);
EasyMock.expect(taskMock.getType()).andReturn("compact").once();
EasyMock.expect(kubernetesTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
EasyMock.replay(taskMock, kubernetesTaskRunner);
Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get());
EasyMock.verify(taskMock, kubernetesTaskRunner);
}
@Test
public void test_getUsedCapacity()
{

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.runnerstrategy;
import org.apache.druid.indexing.common.task.Task;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(EasyMockRunner.class)
public class KubernetesRunnerStrategyTest extends EasyMockSupport
{
@Mock
Task task;
@Test
public void test_kubernetesRunnerStrategy_returnsCorrectRunnerType()
{
KubernetesRunnerStrategy runnerStrategy = new KubernetesRunnerStrategy();
Assert.assertEquals(RunnerStrategy.RunnerType.KUBERNETES_RUNNER_TYPE, runnerStrategy.getRunnerTypeForTask(task));
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.runnerstrategy;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(EasyMockRunner.class)
public class TaskTypeRunnerStrategyTest extends EasyMockSupport
{
@Mock
Task task;
@Test
public void test_taskTypeRunnerStrategy_returnsCorrectRunnerType()
{
TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", ImmutableMap.of("index_kafka", "worker"));
EasyMock.expect(task.getType()).andReturn("index_kafka");
EasyMock.expectLastCall().once();
EasyMock.expect(task.getType()).andReturn("compact");
EasyMock.expectLastCall().once();
replayAll();
Assert.assertEquals(RunnerStrategy.WORKER_NAME, runnerStrategy.getRunnerTypeForTask(task).getType());
Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, runnerStrategy.getRunnerTypeForTask(task).getType());
verifyAll();
}
@Test(expected = IllegalArgumentException.class)
public void test_invalidOverridesConfig_shouldThrowException()
{
new TaskTypeRunnerStrategy(
"k8s",
ImmutableMap.of(
"index_kafka",
"non_exist_runner"
)
);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.overlord.runnerstrategy;
import org.apache.druid.indexing.common.task.Task;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(EasyMockRunner.class)
public class WorkerRunnerStrategyTest extends EasyMockSupport
{
@Mock
Task task;
@Test
public void test_workerRunnerStrategy_returnsCorrectRunnerType()
{
WorkerRunnerStrategy runnerStrategy = new WorkerRunnerStrategy();
Assert.assertEquals(RunnerStrategy.WORKER_NAME, runnerStrategy.getRunnerTypeForTask(task).getType());
}
}

View File

@ -1,4 +1,4 @@
{
"workerTaskRunnerType": "remote",
"sendAllTasksToWorkerTaskRunner": false
"runnerStrategy.type": "worker",
"runnerStrategy.workerType": "remote"
}