mirror of https://github.com/apache/druid.git
JavaScript Worker Select Strategy
this PR adds a JavaScriptWorkerSelectStrategy which allows defining arbitrary logic for selecting workers to run task using a JavaScript function. This gives users full control to implement complex worker selection strategies based on task attributes. more tests and a complex javascript config fix for java8 modify for nashorn compatibility
This commit is contained in:
parent
f7a7daeff9
commit
fb4052d577
|
@ -147,7 +147,7 @@ Issuing a GET request at the same URL will return the current worker config spec
|
|||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`selectStrategy`|How to assign tasks to middlemanagers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, and `equalDistribution`.|fillCapacity|
|
||||
|`selectStrategy`|How to assign tasks to middlemanagers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution` and `javascript`.|fillCapacity|
|
||||
|`autoScaler`|Only used if autoscaling is enabled. See below.|null|
|
||||
|
||||
To view the audit history of worker config issue a GET request to the URL -
|
||||
|
@ -187,6 +187,33 @@ The workers with the least amount of tasks is assigned the task.
|
|||
|--------|-----------|-------|
|
||||
|`type`|`equalDistribution`.|fillCapacity|
|
||||
|
||||
##### Javascript
|
||||
|
||||
Allows defining arbitrary logic for selecting workers to run task using a JavaScript function.
|
||||
The function is passed remoteTaskRunnerConfig, map of workerId to available workers and task to be executed and returns the workerId on which the task should be run or null if the task cannot be run.
|
||||
It can be used for rapid development of missing features where the worker selection logic is to be changed or tuned often.
|
||||
If the selection logic is quite complex and cannot be easily tested in javascript environment,
|
||||
its better to write a druid extension module with extending current worker selection strategies written in java.
|
||||
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`type`|`javascript`.|javascript|
|
||||
|`function`|String representing javascript function||
|
||||
|
||||
Example: a function that sends batch_index_task to workers 10.0.0.1 and 10.0.0.2 and all other tasks to other available workers.
|
||||
|
||||
```
|
||||
{
|
||||
"type":"javascript",
|
||||
"function":"function (config, zkWorkers, task) {\nvar batch_workers = new java.util.ArrayList();\nbatch_workers.add(\"10.0.0.1\");\nbatch_workers.add(\"10.0.0.2\");\nworkers = zkWorkers.keySet().toArray();\nvar sortedWorkers = new Array()\n;for(var i = 0; i < workers.length; i++){\n sortedWorkers[i] = workers[i];\n}\nArray.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\nvar minWorkerVer = config.getMinWorkerVersion();\nfor (var i = 0; i < sortedWorkers.length; i++) {\n var worker = sortedWorkers[i];\n var zkWorker = zkWorkers.get(worker);\n if(zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)){\n if(task.getType() == 'index_hadoop' && batch_workers.contains(worker)){\n return worker;\n } else {\n if(task.getType() != 'index_hadoop' && !batch_workers.contains(worker)){\n return worker;\n }\n }\n }\n}\nreturn null;\n}"
|
||||
}
|
||||
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
#### Autoscaler
|
||||
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||
import java.util.Map;
|
||||
import javax.script.Compilable;
|
||||
import javax.script.Invocable;
|
||||
import javax.script.ScriptEngine;
|
||||
import javax.script.ScriptEngineManager;
|
||||
import javax.script.ScriptException;
|
||||
|
||||
public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
|
||||
{
|
||||
public static interface SelectorFunction
|
||||
{
|
||||
public String apply(RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task);
|
||||
}
|
||||
|
||||
private final SelectorFunction fnSelector;
|
||||
private final String function;
|
||||
|
||||
@JsonCreator
|
||||
public JavaScriptWorkerSelectStrategy(@JsonProperty("function") String fn)
|
||||
{
|
||||
Preconditions.checkNotNull(fn, "function must not be null");
|
||||
final ScriptEngine engine = new ScriptEngineManager().getEngineByName("javascript");
|
||||
try {
|
||||
((Compilable) engine).compile("var apply = " + fn).eval();
|
||||
}
|
||||
catch (ScriptException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
this.function = fn;
|
||||
this.fnSelector = ((Invocable) engine).getInterface(SelectorFunction.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ImmutableZkWorker> findWorkerForTask(
|
||||
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
|
||||
)
|
||||
{
|
||||
String worker = fnSelector.apply(config, zkWorkers, task);
|
||||
return Optional.fromNullable(worker == null ? null : zkWorkers.get(worker));
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFunction()
|
||||
{
|
||||
return function;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
JavaScriptWorkerSelectStrategy that = (JavaScriptWorkerSelectStrategy) o;
|
||||
|
||||
if (function != null ? !function.equals(that.function) : that.function != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return function.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "JavaScriptWorkerSelectStrategy{" +
|
||||
"function='" + function + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -32,7 +32,8 @@ import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
|||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class),
|
||||
@JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class),
|
||||
@JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class)
|
||||
@JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class),
|
||||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class)
|
||||
})
|
||||
public interface WorkerSelectStrategy
|
||||
{
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
||||
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class JavaScriptWorkerSelectStrategyTest
|
||||
{
|
||||
|
||||
final JavaScriptWorkerSelectStrategy strategy = new JavaScriptWorkerSelectStrategy(
|
||||
"function (config, zkWorkers, task) {\n"
|
||||
+ "var batch_workers = new java.util.ArrayList();\n"
|
||||
+ "batch_workers.add(\"10.0.0.1\");\n"
|
||||
+ "batch_workers.add(\"10.0.0.2\");\n"
|
||||
+ "workers = zkWorkers.keySet().toArray();\n"
|
||||
+ "var sortedWorkers = new Array()\n;"
|
||||
+ "for(var i = 0; i < workers.length; i++){\n"
|
||||
+ " sortedWorkers[i] = workers[i];\n"
|
||||
+ "}\n"
|
||||
+ "Array.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\n"
|
||||
+ "var minWorkerVer = config.getMinWorkerVersion();\n"
|
||||
+ "for (var i = 0; i < sortedWorkers.length; i++) {\n"
|
||||
+ " var worker = sortedWorkers[i];\n"
|
||||
+ " var zkWorker = zkWorkers.get(worker);\n"
|
||||
+ " if(zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)){\n"
|
||||
+ " if(task.getType() == 'index_hadoop' && batch_workers.contains(worker)){\n"
|
||||
+ " return worker;\n"
|
||||
+ " } else {\n"
|
||||
+ " if(task.getType() != 'index_hadoop' && !batch_workers.contains(worker)){\n"
|
||||
+ " return worker;\n"
|
||||
+ " }\n"
|
||||
+ " }\n"
|
||||
+ " }\n"
|
||||
+ "}\n"
|
||||
+ "return null;\n"
|
||||
+ "}"
|
||||
);
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
Assert.assertEquals(
|
||||
strategy,
|
||||
mapper.readValue(
|
||||
mapper.writeValueAsString(strategy),
|
||||
JavaScriptWorkerSelectStrategy.class
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindWorkerForTask()
|
||||
{
|
||||
ImmutableZkWorker worker1 = createMockWorker(1, true, true);
|
||||
ImmutableZkWorker worker2 = createMockWorker(1, true, true);
|
||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
||||
"10.0.0.1", worker1,
|
||||
"10.0.0.3", worker2
|
||||
);
|
||||
|
||||
ImmutableZkWorker workerForBatchTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("index_hadoop")
|
||||
).get();
|
||||
// batch tasks should be sent to worker1
|
||||
Assert.assertEquals(worker1, workerForBatchTask);
|
||||
|
||||
ImmutableZkWorker workerForOtherTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("other_type")
|
||||
).get();
|
||||
// all other tasks should be sent to worker2
|
||||
Assert.assertEquals(worker2, workerForOtherTask);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsolationOfBatchWorker()
|
||||
{
|
||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
||||
"10.0.0.1", createMockWorker(1, true, true),
|
||||
"10.0.0.2", createMockWorker(1, true, true)
|
||||
);
|
||||
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("other_type")
|
||||
);
|
||||
Assert.assertFalse(workerForOtherTask.isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoValidWorker()
|
||||
{
|
||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
||||
"10.0.0.1", createMockWorker(1, true, false),
|
||||
"10.0.0.4", createMockWorker(1, true, false)
|
||||
);
|
||||
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("index_hadoop")
|
||||
);
|
||||
Assert.assertFalse(workerForBatchTask.isPresent());
|
||||
|
||||
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("otherTask")
|
||||
);
|
||||
// all other tasks should be sent to worker2
|
||||
Assert.assertFalse(workerForOtherTask.isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoWorkerCanRunTask()
|
||||
{
|
||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
||||
"10.0.0.1", createMockWorker(1, false, true),
|
||||
"10.0.0.4", createMockWorker(1, false, true)
|
||||
);
|
||||
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("index_hadoop")
|
||||
);
|
||||
Assert.assertFalse(workerForBatchTask.isPresent());
|
||||
|
||||
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("otherTask")
|
||||
);
|
||||
// all other tasks should be sent to worker2
|
||||
Assert.assertFalse(workerForOtherTask.isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFillWorkerCapacity()
|
||||
{
|
||||
// tasks shoudl be assigned to the worker with maximum currCapacity used until its full
|
||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
||||
"10.0.0.1", createMockWorker(1, true, true),
|
||||
"10.0.0.2", createMockWorker(5, true, true)
|
||||
);
|
||||
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
|
||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||
workerMap,
|
||||
createMockTask("index_hadoop")
|
||||
);
|
||||
Assert.assertEquals(workerMap.get("10.0.0.2"), workerForBatchTask.get());
|
||||
|
||||
}
|
||||
|
||||
private Task createMockTask(String type)
|
||||
{
|
||||
Task mock = EasyMock.createMock(Task.class);
|
||||
EasyMock.expect(mock.getType()).andReturn(type).anyTimes();
|
||||
EasyMock.replay(mock);
|
||||
return mock;
|
||||
}
|
||||
|
||||
private ImmutableZkWorker createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
|
||||
{
|
||||
ImmutableZkWorker worker = EasyMock.createMock(ImmutableZkWorker.class);
|
||||
EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes();
|
||||
EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes();
|
||||
EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes();
|
||||
EasyMock.replay(worker);
|
||||
return worker;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue