Support assign tasks to run on different categories of MiddleManagers (#7066)

* Support assign tasks to run on different tiers of MiddleManagers

* address comments

* address comments

* rename tier to category and docs

* doc

* fix doc

* fix spelling errors

* docs
This commit is contained in:
Mingming Qiu 2019-10-18 03:57:19 +08:00 committed by Himanshu
parent d54d2e1627
commit 2c758ef5ff
35 changed files with 1119 additions and 109 deletions

View File

@ -981,6 +981,37 @@ useful if you want work evenly distributed across your MiddleManagers.
|`type`|`equalDistribution`.|required; must be `equalDistribution`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|
###### Equal Distribution With Category Spec
This strategy is a variant of `Equal Distribution`, which support `workerCategorySpec` field rather than `affinityConfig`. By specifying `workerCategorySpec`, you can assign tasks to run on different categories of MiddleManagers based on the tasks' **taskType** and **dataSource name**. This strategy can't work with `AutoScaler` since the behavior is undefined.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistributionWithCategorySpec`.|required; must be `equalDistributionWithCategorySpec`|
|`workerCategorySpec`|[Worker Category Spec](#workercategoryspec) object|null (no worker category spec)|
Example: specify tasks default to run on **c1** whose task
type is "index_kafka", while dataSource "ds1" run on **c2**.
```json
{
"selectStrategy": {
"type": "equalDistributionWithCategorySpec",
"workerCategorySpec": {
"strong": false,
"categoryMap": {
"index_kafka": {
"defaultCategory": "c1",
"categoryAffinity": {
"ds1": "c2"
}
}
}
}
}
}
```
###### Fill Capacity
Tasks are assigned to the worker with the most currently-running tasks at the time the task begins running. This is
@ -995,6 +1026,17 @@ MiddleManagers up to capacity simultaneously, rather than a single MiddleManager
|`type`|`fillCapacity`.|required; must be `fillCapacity`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|
###### Fill Capacity With Category Spec
This strategy is a variant of `Fill Capacity`, which support `workerCategorySpec` field rather than `affinityConfig`. The usage is the same with _equalDistributionWithCategorySpec_ strategy. This strategy can't work with `AutoScaler` since the behavior is undefined.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`fillCapacityWithCategorySpec`.|required; must be `fillCapacityWithCategorySpec`|
|`workerCategorySpec`|[Worker Category Spec](#workercategoryspec) object|null (no worker category spec)|
> Before using the _equalDistributionWithCategorySpec_ and _fillCapacityWithCategorySpec_ strategies, you must upgrade overlord and all MiddleManagers to the version that support this feature.
<a name="javascript-worker-select-strategy"></a>
###### JavaScript
@ -1031,6 +1073,23 @@ field. If not provided, the default is to not use affinity at all.
|`affinity`|JSON object mapping a datasource String name to a list of indexing service MiddleManager host:port String values. Druid doesn't perform DNS resolution, so the 'host' value must match what is configured on the MiddleManager and what the MiddleManager announces itself as (examine the Overlord logs to see what your MiddleManager announces itself as).|{}|
|`strong`|With weak affinity (the default), tasks for a dataSource may be assigned to other MiddleManagers if their affinity-mapped MiddleManagers are not able to run all pending tasks in the queue for that dataSource. With strong affinity, tasks for a dataSource will only ever be assigned to their affinity-mapped MiddleManagers, and will wait in the pending queue if necessary.|false|
###### WorkerCategorySpec
WorkerCategorySpec can be provided to the _equalDistributionWithCategorySpec_ and _fillCapacityWithCategorySpec_ strategies using the "workerCategorySpec"
field. If not provided, the default is to not use it at all.
|Property|Description|Default|
|--------|-----------|-------|
|`categoryMap`|A JSON map object mapping a task type String name to a [CategoryConfig](#categoryconfig) object, by which you can specify category config for different task type.|{}|
|`strong`|With weak workerCategorySpec (the default), tasks for a dataSource may be assigned to other MiddleManagers if the MiddleManagers specified in `categoryMap` are not able to run all pending tasks in the queue for that dataSource. With strong workerCategorySpec, tasks for a dataSource will only ever be assigned to their specified MiddleManagers, and will wait in the pending queue if necessary.|false|
###### CategoryConfig
|Property|Description|Default|
|--------|-----------|-------|
|`defaultCategory`|Specify default category for a task type.|null|
|`categoryAffinity`|A JSON map object mapping a datasource String name to a category String name of the MiddleManager. If category isn't specified for a datasource, then using the `defaultCategory`. If no specified category and the `defaultCategory` is also null, then tasks can run on any available MiddleManagers.|null|
##### Autoscaler
Amazon's EC2 is currently the only supported autoscaler.
@ -1082,6 +1141,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM
|`druid.worker.ip`|The IP of the worker.|localhost|
|`druid.worker.version`|Version identifier for the MiddleManager.|0|
|`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of available processors - 1|
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`__default_worker_category`|
#### Peon Processing

View File

@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -477,7 +478,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
private static ImmutableWorkerInfo createDummyWorker(String scheme, String host, int capacity, String version)
{
return new ImmutableWorkerInfo(
new Worker(scheme, host, "-2", capacity, version),
new Worker(scheme, host, "-2", capacity, version, WorkerConfig.DEFAULT_CATEGORY),
0,
new HashSet<>(),
new HashSet<>(),

View File

@ -485,7 +485,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
node.getDruidNode().getHostAndPortToUse(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getIp(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCapacity(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion()
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion(),
((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCategory()
);
}

View File

@ -183,7 +183,8 @@ public class WorkerHolder
worker.getHost(),
worker.getIp(),
worker.getCapacity(),
""
"",
worker.getCategory()
);
}
w = disabledWorker;

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import javax.annotation.Nullable;
import java.util.Objects;
public class EqualDistributionWithCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy
{
private final WorkerCategorySpec workerCategorySpec;
@JsonCreator
public EqualDistributionWithCategorySpecWorkerSelectStrategy(
@JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
)
{
this.workerCategorySpec = workerCategorySpec;
}
@JsonProperty
public WorkerCategorySpec getWorkerCategorySpec()
{
return workerCategorySpec;
}
@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
workerCategorySpec,
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
);
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final EqualDistributionWithCategorySpecWorkerSelectStrategy that = (EqualDistributionWithCategorySpecWorkerSelectStrategy) o;
return Objects.equals(workerCategorySpec, that.workerCategorySpec);
}
@Override
public int hashCode()
{
return Objects.hash(workerCategorySpec);
}
@Override
public String toString()
{
return "EqualDistributionWithCategorySpecWorkerSelectStrategy{" +
"workerCategorySpec=" + workerCategorySpec +
'}';
}
}

View File

@ -66,7 +66,7 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate
);
}
private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity)

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import javax.annotation.Nullable;
import java.util.Objects;
public class FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy
{
private final WorkerCategorySpec workerCategorySpec;
@JsonCreator
public FillCapacityWithCategorySpecWorkerSelectStrategy(
@JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
)
{
this.workerCategorySpec = workerCategorySpec;
}
@JsonProperty
public WorkerCategorySpec getWorkerCategorySpec()
{
return workerCategorySpec;
}
@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
workerCategorySpec,
FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
);
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final FillCapacityWithCategorySpecWorkerSelectStrategy that = (FillCapacityWithCategorySpecWorkerSelectStrategy) o;
return Objects.equals(workerCategorySpec, that.workerCategorySpec);
}
@Override
public int hashCode()
{
return Objects.hash(workerCategorySpec);
}
@Override
public String toString()
{
return "FillCapacityWithCategorySpecWorkerSelectStrategy{" +
"workerCategorySpec=" + workerCategorySpec +
'}';
}
}

View File

@ -64,7 +64,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
);
}
private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getCurrCapacityUsed)

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
public class WorkerCategorySpec
{
// key: taskType, value: categoryConfig
private final Map<String, CategoryConfig> categoryMap;
private final boolean strong;
@JsonCreator
public WorkerCategorySpec(
@JsonProperty("categoryMap") Map<String, CategoryConfig> categoryMap,
@JsonProperty("strong") boolean strong
)
{
this.categoryMap = categoryMap == null ? Collections.EMPTY_MAP : categoryMap;
this.strong = strong;
}
@JsonProperty
public Map<String, CategoryConfig> getCategoryMap()
{
return categoryMap;
}
@JsonProperty
public boolean isStrong()
{
return strong;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final WorkerCategorySpec that = (WorkerCategorySpec) o;
return strong == that.strong &&
Objects.equals(categoryMap, that.categoryMap);
}
@Override
public int hashCode()
{
return Objects.hash(categoryMap, strong);
}
@Override
public String toString()
{
return "WorkerCategorySpec{" +
"categoryMap=" + categoryMap +
", strong=" + strong +
'}';
}
public static class CategoryConfig
{
private final String defaultCategory;
// key: datasource, value: category
private final Map<String, String> categoryAffinity;
@JsonCreator
public CategoryConfig(
@JsonProperty("defaultCategory") String defaultCategory,
@JsonProperty("categoryAffinity") Map<String, String> categoryAffinity
)
{
this.defaultCategory = defaultCategory;
this.categoryAffinity = categoryAffinity == null ? Collections.EMPTY_MAP : categoryAffinity;
}
@JsonProperty
public String getDefaultCategory()
{
return defaultCategory;
}
@JsonProperty
public Map<String, String> getCategoryAffinity()
{
return categoryAffinity;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final CategoryConfig that = (CategoryConfig) o;
return Objects.equals(defaultCategory, that.defaultCategory) &&
Objects.equals(categoryAffinity, that.categoryAffinity);
}
@Override
public int hashCode()
{
return Objects.hash(defaultCategory, categoryAffinity);
}
@Override
public String toString()
{
return "CategoryConfig{" +
"defaultCategory=" + defaultCategory +
", categoryAffinity=" + categoryAffinity +
'}';
}
}
}

View File

@ -38,7 +38,9 @@ import javax.annotation.Nullable;
@JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class)
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "fillCapacityWithCategorySpec", value = FillCapacityWithCategorySpecWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistributionWithCategorySpec", value = EqualDistributionWithCategorySpecWorkerSelectStrategy.class)
})
@PublicApi
public interface WorkerSelectStrategy

View File

@ -57,13 +57,7 @@ public class WorkerSelectUtils
final Function<ImmutableMap<String, ImmutableWorkerInfo>, ImmutableWorkerInfo> workerSelector
)
{
// Workers that could potentially run this task, ignoring affinityConfig.
final Map<String, ImmutableWorkerInfo> runnableWorkers = allWorkers
.values()
.stream()
.filter(worker -> worker.canRunTask(task)
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
final Map<String, ImmutableWorkerInfo> runnableWorkers = getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig);
if (affinityConfig == null) {
// All runnable workers are valid.
@ -95,6 +89,90 @@ public class WorkerSelectUtils
}
}
/**
* Helper for {@link WorkerSelectStrategy} implementations.
*
* @param allWorkers map of all workers, in the style provided to {@link WorkerSelectStrategy}
* @param workerCategorySpec worker category spec, or null
* @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run
* the task, and worker satisfies the worker category spec. may return null.
*
* @return selected worker from "allWorkers", or null.
*/
@Nullable
public static ImmutableWorkerInfo selectWorker(
final Task task,
final Map<String, ImmutableWorkerInfo> allWorkers,
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
@Nullable final WorkerCategorySpec workerCategorySpec,
final Function<ImmutableMap<String, ImmutableWorkerInfo>, ImmutableWorkerInfo> workerSelector
)
{
final Map<String, ImmutableWorkerInfo> runnableWorkers = getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig);
// select worker according to worker category spec
if (workerCategorySpec != null) {
final WorkerCategorySpec.CategoryConfig categoryConfig = workerCategorySpec.getCategoryMap().get(task.getType());
if (categoryConfig != null) {
final String defaultCategory = categoryConfig.getDefaultCategory();
final Map<String, String> categoryAffinity = categoryConfig.getCategoryAffinity();
String preferredCategory = categoryAffinity.get(task.getDataSource());
// If there is no preferred category for the datasource, then using the defaultCategory. However, the defaultCategory
// may be null too, so we need to do one more null check (see below).
preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory;
if (preferredCategory != null) {
// select worker from preferred category
final ImmutableMap<String, ImmutableWorkerInfo> categoryWorkers = getCategoryWorkers(preferredCategory, runnableWorkers);
final ImmutableWorkerInfo selected = workerSelector.apply(categoryWorkers);
if (selected != null) {
return selected;
} else if (workerCategorySpec.isStrong()) {
return null;
}
}
}
}
// select worker from all runnable workers by default
return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers));
}
// Get workers that could potentially run this task, ignoring affinityConfig/workerCategorySpec.
private static Map<String, ImmutableWorkerInfo> getRunnableWorkers(
final Task task,
final Map<String, ImmutableWorkerInfo> allWorkers,
final WorkerTaskRunnerConfig workerTaskRunnerConfig
)
{
return allWorkers.values()
.stream()
.filter(worker -> worker.canRunTask(task)
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
}
/**
* Return workers belong to this category.
*
* @param category worker category name
* @param workerMap map of worker hostname to worker info
*
* @return map of worker hostname to worker info
*/
private static ImmutableMap<String, ImmutableWorkerInfo> getCategoryWorkers(
final String category,
final Map<String, ImmutableWorkerInfo> workerMap
)
{
return ImmutableMap.copyOf(
Maps.filterValues(workerMap, workerInfo -> workerInfo.getWorker().getCategory().equals(category))
);
}
/**
* Return workers not assigned to any affinity pool at all.
*

View File

@ -34,6 +34,7 @@ public class Worker
private final String ip;
private final int capacity;
private final String version;
private final String category;
@JsonCreator
public Worker(
@ -41,7 +42,8 @@ public class Worker
@JsonProperty("host") String host,
@JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity,
@JsonProperty("version") String version
@JsonProperty("version") String version,
@JsonProperty("category") String category
)
{
this.scheme = scheme == null ? "http" : scheme; // needed for backwards compatibility with older workers (pre-#4270)
@ -49,6 +51,7 @@ public class Worker
this.ip = ip;
this.capacity = capacity;
this.version = version;
this.category = category;
}
@JsonProperty
@ -81,6 +84,12 @@ public class Worker
return version;
}
@JsonProperty
public String getCategory()
{
return category;
}
@Override
public boolean equals(Object o)
{
@ -105,7 +114,10 @@ public class Worker
if (!ip.equals(worker.ip)) {
return false;
}
return version.equals(worker.version);
if (!version.equals(worker.version)) {
return false;
}
return category.equals(worker.category);
}
@Override
@ -116,6 +128,7 @@ public class Worker
result = 31 * result + ip.hashCode();
result = 31 * result + capacity;
result = 31 * result + version.hashCode();
result = 31 * result + category.hashCode();
return result;
}
@ -128,6 +141,7 @@ public class Worker
", ip='" + ip + '\'' +
", capacity=" + capacity +
", version='" + version + '\'' +
", category='" + category + '\'' +
'}';
}

View File

@ -91,7 +91,8 @@ public class WorkerResource
enabledWorker.getHost(),
enabledWorker.getIp(),
enabledWorker.getCapacity(),
DISABLED_VERSION
DISABLED_VERSION,
enabledWorker.getCategory()
);
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
workerTaskManager.workerDisabled();

View File

@ -23,6 +23,7 @@ package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
@ -35,7 +36,7 @@ public class ImmutableWorkerInfoTest
{
ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 10, "v1"
"http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -56,7 +57,7 @@ public class ImmutableWorkerInfoTest
// Everything equal
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 10, "v1"
"http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -64,7 +65,7 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 10, "v1"
"http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -72,10 +73,10 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2015-01-01T01:01:01Z")
), true);
// different worker same tasks
// same worker different category
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker1", "192.0.0.1", 10, "v1"
"http", "testWorker", "192.0.0.1", 10, "v1", "c1"
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -83,7 +84,26 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker2", "192.0.0.1", 10, "v1"
"http", "testWorker", "192.0.0.1", 10, "v1", "c2"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
), false);
// different worker same tasks
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -94,7 +114,7 @@ public class ImmutableWorkerInfoTest
// same worker different task groups
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 10, "v1"
"http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp3", "grp2"),
@ -102,7 +122,7 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 10, "v1"
"http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -113,7 +133,7 @@ public class ImmutableWorkerInfoTest
// same worker different tasks
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker1", "192.0.0.1", 10, "v1"
"http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -121,7 +141,7 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker2", "192.0.0.1", 10, "v1"
"http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -132,7 +152,7 @@ public class ImmutableWorkerInfoTest
// same worker different capacity
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker1", "192.0.0.1", 10, "v1"
"http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
3,
ImmutableSet.of("grp1", "grp2"),
@ -140,7 +160,7 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker2", "192.0.0.1", 10, "v1"
"http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -151,7 +171,7 @@ public class ImmutableWorkerInfoTest
// same worker different lastCompletedTaskTime
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker1", "192.0.0.1", 10, "v1"
"http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
3,
ImmutableSet.of("grp1", "grp2"),
@ -159,7 +179,7 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker2", "192.0.0.1", 10, "v1"
"http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -170,7 +190,7 @@ public class ImmutableWorkerInfoTest
// same worker different blacklistedUntil
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker1", "192.0.0.1", 10, "v1"
"http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
3,
ImmutableSet.of("grp1", "grp2"),
@ -179,7 +199,7 @@ public class ImmutableWorkerInfoTest
DateTimes.of("2017-07-30")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker2", "192.0.0.1", 10, "v1"
"http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),

View File

@ -42,6 +42,7 @@ import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
@ -146,7 +147,8 @@ public class RemoteTaskRunnerTestUtils
workerId,
workerId,
capacity,
"0"
"0",
WorkerConfig.DEFAULT_CATEGORY
);
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
@ -162,7 +164,14 @@ public class RemoteTaskRunnerTestUtils
{
cf.setData().forPath(
JOINER.join(ANNOUNCEMENTS_PATH, worker.getHost()),
jsonMapper.writeValueAsBytes(new Worker(worker.getScheme(), worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
jsonMapper.writeValueAsBytes(new Worker(
worker.getScheme(),
worker.getHost(),
worker.getIp(),
worker.getCapacity(),
"",
worker.getCategory()
))
);
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.junit.Assert;
import org.junit.Test;
@ -31,7 +32,7 @@ public class TaskRunnerUtilsTest
public void testMakeWorkerURL()
{
final URL url = TaskRunnerUtils.makeWorkerURL(
new Worker("https", "1.2.3.4:8290", "1.2.3.4", 1, "0"),
new Worker("https", "1.2.3.4:8290", "1.2.3.4", 1, "0", WorkerConfig.DEFAULT_CATEGORY),
"/druid/worker/v1/task/%s/log",
"foo bar&"
);

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.http.client.HttpClient;
@ -66,7 +67,7 @@ public class WorkerTaskRunnerQueryAdpaterTest
ImmutableList.of(
new ImmutableWorkerInfo(
new Worker(
"http", "worker-host1", "192.0.0.1", 10, "v1"
"http", "worker-host1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
@ -75,7 +76,7 @@ public class WorkerTaskRunnerQueryAdpaterTest
),
new ImmutableWorkerInfo(
new Worker(
"https", "worker-host2", "192.0.0.2", 4, "v1"
"https", "worker-host2", "192.0.0.2", 4, "v1", WorkerConfig.DEFAULT_CATEGORY
),
1,
ImmutableSet.of("grp1"),

View File

@ -36,6 +36,7 @@ import org.apache.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -577,7 +578,7 @@ public class PendingTaskBasedProvisioningStrategyTest
int capacity
)
{
super(new Worker(scheme, host, ip, capacity, version), null, new DefaultObjectMapper());
super(new Worker(scheme, host, ip, capacity, version, WorkerConfig.DEFAULT_CATEGORY), null, new DefaultObjectMapper());
this.testTask = testTask;
}

View File

@ -34,6 +34,7 @@ import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -507,7 +508,7 @@ public class SimpleProvisioningStrategyTest
String version
)
{
super(new Worker(scheme, host, ip, 3, version), null, new DefaultObjectMapper());
super(new Worker(scheme, host, ip, 3, version, WorkerConfig.DEFAULT_CATEGORY), null, new DefaultObjectMapper());
this.testTask = testTask;
}

View File

@ -46,6 +46,7 @@ import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -143,7 +144,7 @@ public class HttpRemoteTaskRunnerTest
new DruidNode("service", "host1", false, 8080, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
@ -151,7 +152,7 @@ public class HttpRemoteTaskRunnerTest
new DruidNode("service", "host2", false, 8080, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0")
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
@ -240,7 +241,7 @@ public class HttpRemoteTaskRunnerTest
new DruidNode("service", "host1", false, 8080, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
@ -248,7 +249,7 @@ public class HttpRemoteTaskRunnerTest
new DruidNode("service", "host2", false, 8080, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0")
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
@ -344,7 +345,7 @@ public class HttpRemoteTaskRunnerTest
new DruidNode("service", "host", false, 1234, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
@ -489,7 +490,7 @@ public class HttpRemoteTaskRunnerTest
new DruidNode("service", "host", false, 1234, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
@ -663,7 +664,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", false, 1234, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0"))
ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY))
);
workerHolders.put(
@ -844,7 +845,7 @@ public class HttpRemoteTaskRunnerTest
new DruidNode("service", "host1", false, 8080, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0")
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
@ -889,7 +890,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", false, 8080, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0"))
ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY))
);
workerHolders.put(
@ -920,7 +921,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
new DruidNode("service", "host3", false, 8080, null, true, false),
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0"))
ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY))
);
workerHolders.put(
@ -966,7 +967,7 @@ public class HttpRemoteTaskRunnerTest
);
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1")).anyTimes();
EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes();
workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
workerHolder.resetContinuouslyFailedTasksCount();
EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0);
@ -1002,7 +1003,7 @@ public class HttpRemoteTaskRunnerTest
// Another "rogue-worker" reports running it, and gets asked to shutdown the task
WorkerHolder rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY))
.anyTimes();
rogueWorkerHolder.shutdownTask(task.getId());
EasyMock.replay(rogueWorkerHolder);
@ -1017,7 +1018,7 @@ public class HttpRemoteTaskRunnerTest
// "rogue-worker" reports FAILURE for the task, ignored
rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY))
.anyTimes();
EasyMock.replay(rogueWorkerHolder);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
@ -1040,7 +1041,7 @@ public class HttpRemoteTaskRunnerTest
// "rogue-worker" reports running it, and gets asked to shutdown the task
rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY))
.anyTimes();
rogueWorkerHolder.shutdownTask(task.getId());
EasyMock.replay(rogueWorkerHolder);
@ -1055,7 +1056,7 @@ public class HttpRemoteTaskRunnerTest
// "rogue-worker" reports FAILURE for the tasks, ignored
rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY))
.anyTimes();
EasyMock.replay(rogueWorkerHolder);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
@ -1094,7 +1095,7 @@ public class HttpRemoteTaskRunnerTest
listenerNotificationsAccumulator
);
Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1");
Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY);
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
@ -1153,7 +1154,7 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunner taskRunner =
createTaskRunnerForTestTaskAddedOrUpdated(taskStorage, listenerNotificationsAccumulator);
Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1");
Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY);
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerHistoryItem;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
@ -59,7 +60,7 @@ public class WorkerHolderTest
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(ScheduledExecutorService.class),
(taskAnnouncement, holder) -> updates.add(taskAnnouncement),
new Worker("http", "localhost", "127.0.0.1", 5, "v0"),
new Worker("http", "localhost", "127.0.0.1", 5, "v0", WorkerConfig.DEFAULT_CATEGORY),
ImmutableList.of(
TaskAnnouncement.create(
task0,

View File

@ -26,6 +26,7 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
@ -55,31 +56,31 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"localhost0",
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", 2, "v1"), 0,
new Worker("http", "localhost0", "localhost0", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost1",
new ImmutableWorkerInfo(
new Worker("http", "localhost1", "localhost1", 2, "v1"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
new Worker("http", "localhost1", "localhost1", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost2",
new ImmutableWorkerInfo(
new Worker("http", "localhost2", "localhost2", 2, "v1"), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
new Worker("http", "localhost2", "localhost2", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost3",
new ImmutableWorkerInfo(
new Worker("http", "localhost3", "localhost3", 2, "v1"), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
new Worker("http", "localhost3", "localhost3", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
)
),
noopTask
@ -99,17 +100,17 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("http", "lhost", "lhost", 1, "v1"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "localhost", "localhost", 1, "v1"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
@ -129,10 +130,10 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "localhost", "localhost", 1, "v1"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.setup;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashSet;
public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest
{
private static final ImmutableMap<String, ImmutableWorkerInfo> WORKERS_FOR_TIER_TESTS =
ImmutableMap.of(
"localhost0",
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", 1, "v1", "c1"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost1",
new ImmutableWorkerInfo(
new Worker("http", "localhost1", "localhost1", 2, "v1", "c1"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost2",
new ImmutableWorkerInfo(
new Worker("http", "localhost2", "localhost2", 3, "v1", "c2"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost3",
new ImmutableWorkerInfo(
new Worker("http", "localhost3", "localhost3", 4, "v1", "c2"), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
)
);
@Test
public void testFindWorkerForTaskWithNullWorkerTierSpec()
{
ImmutableWorkerInfo worker = selectWorker(null);
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWithPreferredTier()
{
// test defaultTier != null and tierAffinity is not empty
final WorkerCategorySpec workerCategorySpec1 = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c2",
ImmutableMap.of("ds1", "c2")
)
),
false
);
ImmutableWorkerInfo worker1 = selectWorker(workerCategorySpec1);
Assert.assertEquals("localhost3", worker1.getWorker().getHost());
// test defaultTier == null and tierAffinity is not empty
final WorkerCategorySpec workerCategorySpec2 = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
ImmutableMap.of("ds1", "c2")
)
),
false
);
ImmutableWorkerInfo worker2 = selectWorker(workerCategorySpec2);
Assert.assertEquals("localhost3", worker2.getWorker().getHost());
// test defaultTier != null and tierAffinity is empty
final WorkerCategorySpec workerCategorySpec3 = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c2",
null
)
),
false
);
ImmutableWorkerInfo worker3 = selectWorker(workerCategorySpec3);
Assert.assertEquals("localhost3", worker3.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWithNullPreferredTier()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
null
)
),
false
);
ImmutableWorkerInfo worker = selectWorker(workerCategorySpec);
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}
@Test
public void testWeakTierSpec()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c3")
)
),
false
);
ImmutableWorkerInfo worker = selectWorker(workerCategorySpec);
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}
@Test
public void testStrongTierSpec()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c3")
)
),
true
);
ImmutableWorkerInfo worker = selectWorker(workerCategorySpec);
Assert.assertNull(worker);
}
private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec)
{
final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = new EqualDistributionWithCategorySpecWorkerSelectStrategy(
workerCategorySpec);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
new NoopTask(null, null, "ds1", 1, 0, null, null, null)
);
return worker;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
@ -37,28 +38,28 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"localhost0",
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", 2, "v1"), 0,
new Worker("http", "localhost0", "localhost0", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost1",
new ImmutableWorkerInfo(
new Worker("http", "localhost1", "localhost1", 2, "v1"), 0,
new Worker("http", "localhost1", "localhost1", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost2",
new ImmutableWorkerInfo(
new Worker("http", "localhost2", "localhost2", 2, "v1"), 1,
new Worker("http", "localhost2", "localhost2", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost3",
new ImmutableWorkerInfo(
new Worker("http", "localhost3", "localhost3", 2, "v1"), 1,
new Worker("http", "localhost3", "localhost3", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
@ -75,14 +76,14 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("http", "lhost", "lhost", 1, "v1"), 0,
new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "localhost", "localhost", 1, "v1"), 1,
new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
@ -110,14 +111,14 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("http", "lhost", "lhost", 5, "v1"), 5,
new Worker("http", "lhost", "lhost", 5, "v1", WorkerConfig.DEFAULT_CATEGORY), 5,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "localhost", "localhost", 10, "v1"), 5,
new Worker("http", "localhost", "localhost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY), 5,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
@ -146,14 +147,14 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("http", "disableHost", "disableHost", 10, disabledVersion), 2,
new Worker("http", "disableHost", "disableHost", 10, disabledVersion, WorkerConfig.DEFAULT_CATEGORY), 2,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "enableHost", "enableHost", 10, "v1"), 5,
new Worker("http", "enableHost", "enableHost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY), 5,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
@ -182,14 +183,14 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("http", "disableHost", "disableHost", 10, disabledVersion), 5,
new Worker("http", "disableHost", "disableHost", 10, disabledVersion, WorkerConfig.DEFAULT_CATEGORY), 5,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "enableHost", "enableHost", 10, "v1"), 5,
new Worker("http", "enableHost", "enableHost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY), 5,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
@ -45,14 +46,14 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("http", "lhost", "lhost", 1, "v1"), 0,
new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "localhost", "localhost", 1, "v1"), 0,
new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
@ -82,14 +83,14 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("http", "lhost", "lhost", 1, "v1"), 0,
new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "localhost", "localhost", 1, "v1"), 0,
new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
@ -112,7 +113,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of(
"localhost",
new ImmutableWorkerInfo(
new Worker("http", "localhost", "localhost", 1, "v1"), 0,
new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.setup;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashSet;
public class FillCapacityWithCategorySpecWorkerSelectStrategyTest
{
private static final ImmutableMap<String, ImmutableWorkerInfo> WORKERS_FOR_TIER_TESTS =
ImmutableMap.of(
"localhost0",
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", 5, "v1", "c1"), 1,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost1",
new ImmutableWorkerInfo(
new Worker("http", "localhost1", "localhost1", 5, "v1", "c1"), 2,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost2",
new ImmutableWorkerInfo(
new Worker("http", "localhost2", "localhost2", 5, "v1", "c2"), 3,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
"localhost3",
new ImmutableWorkerInfo(
new Worker("http", "localhost3", "localhost3", 5, "v1", "c2"), 4,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
)
);
@Test
public void testFindWorkerForTaskWithNullWorkerTierSpec()
{
ImmutableWorkerInfo worker = selectWorker(null);
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWithPreferredTier()
{
// test defaultTier != null and tierAffinity is not empty
final WorkerCategorySpec workerCategorySpec1 = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c1")
)
),
false
);
ImmutableWorkerInfo worker1 = selectWorker(workerCategorySpec1);
Assert.assertEquals("localhost1", worker1.getWorker().getHost());
// test defaultTier == null and tierAffinity is not empty
final WorkerCategorySpec workerCategorySpec2 = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
ImmutableMap.of("ds1", "c1")
)
),
false
);
ImmutableWorkerInfo worker2 = selectWorker(workerCategorySpec2);
Assert.assertEquals("localhost1", worker2.getWorker().getHost());
// test defaultTier != null and tierAffinity is empty
final WorkerCategorySpec workerCategorySpec3 = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
null
)
),
false
);
ImmutableWorkerInfo worker3 = selectWorker(workerCategorySpec3);
Assert.assertEquals("localhost1", worker3.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWithNullPreferredTier()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
null
)
),
false
);
ImmutableWorkerInfo worker = selectWorker(workerCategorySpec);
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}
@Test
public void testWeakTierSpec()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c3")
)
),
false
);
ImmutableWorkerInfo worker = selectWorker(workerCategorySpec);
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}
@Test
public void testStrongTierSpec()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c3")
)
),
true
);
ImmutableWorkerInfo worker = selectWorker(workerCategorySpec);
Assert.assertNull(worker);
}
private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec)
{
final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = new FillCapacityWithCategorySpecWorkerSelectStrategy(
workerCategorySpec);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
new NoopTask(null, null, "ds1", 1, 0, null, null, null)
);
return worker;
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class WorkerCategorySpecTest
{
private ObjectMapper mapper;
@Before
public void setUp()
{
mapper = new DefaultObjectMapper();
}
@Test
public void testSerde() throws Exception
{
String jsonStr = "{\n"
+ " \"strong\": true,\n"
+ " \"categoryMap\": {\n"
+ " \"index_kafka\": {\"defaultCategory\": \"c1\", \"categoryAffinity\": {\"ds1\": \"c2\"}}\n"
+ " }\n"
+ "}";
WorkerCategorySpec workerCategorySpec = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
WorkerCategorySpec.class
)
), WorkerCategorySpec.class
);
Assert.assertTrue(workerCategorySpec.isStrong());
Assert.assertEquals(ImmutableMap.of(
"index_kafka",
new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", "c2"))
), workerCategorySpec.getCategoryMap());
}
}

View File

@ -43,6 +43,7 @@ import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
@ -112,7 +113,8 @@ public class WorkerTaskMonitorTest
"worker",
"localhost",
3,
"0"
"0",
WorkerConfig.DEFAULT_CATEGORY
);
workerCuratorCoordinator = new WorkerCuratorCoordinator(

View File

@ -29,6 +29,7 @@ import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.initialization.IndexerZkConfig;
@ -77,7 +78,8 @@ public class WorkerResourceTest
"host",
"ip",
3,
"v1"
"v1",
WorkerConfig.DEFAULT_CATEGORY
);
curatorCoordinator = new WorkerCuratorCoordinator(

View File

@ -33,16 +33,19 @@ public class WorkerNodeService extends DruidService
private final String ip;
private final int capacity;
private final String version;
private final String category;
public WorkerNodeService(
@JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity,
@JsonProperty("version") String version
@JsonProperty("version") String version,
@JsonProperty("category") String category
)
{
this.ip = ip;
this.capacity = capacity;
this.version = version;
this.category = category;
}
@Override
@ -69,6 +72,12 @@ public class WorkerNodeService extends DruidService
return version;
}
@JsonProperty
public String getCategory()
{
return category;
}
@Override
public boolean equals(Object o)
{
@ -81,13 +90,14 @@ public class WorkerNodeService extends DruidService
WorkerNodeService that = (WorkerNodeService) o;
return capacity == that.capacity &&
Objects.equals(ip, that.ip) &&
Objects.equals(version, that.version);
Objects.equals(version, that.version) &&
Objects.equals(category, that.category);
}
@Override
public int hashCode()
{
return Objects.hash(ip, capacity, version);
return Objects.hash(ip, capacity, version, category);
}
@Override
@ -97,6 +107,7 @@ public class WorkerNodeService extends DruidService
"ip='" + ip + '\'' +
", capacity=" + capacity +
", version='" + version + '\'' +
", category='" + category + '\'' +
'}';
}
}

View File

@ -25,11 +25,14 @@ import org.apache.druid.utils.JvmUtils;
import org.joda.time.Period;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
/**
*/
public class WorkerConfig
{
public static final String DEFAULT_CATEGORY = "_default_worker_category";
@JsonProperty
private String ip = DruidNode.getDefaultHost();
@ -41,6 +44,9 @@ public class WorkerConfig
private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
@JsonProperty
@NotNull
private String category = DEFAULT_CATEGORY;
private long intermediaryPartitionDiscoveryPeriodSec = 60L;
@JsonProperty
@ -70,6 +76,11 @@ public class WorkerConfig
return capacity;
}
public String getCategory()
{
return category;
}
public long getIntermediaryPartitionDiscoveryPeriodSec()
{
return intermediaryPartitionDiscoveryPeriodSec;

View File

@ -34,7 +34,8 @@ public class WorkerNodeServiceTest
DruidService expected = new WorkerNodeService(
"1.1.1.1",
100,
"v1"
"v1",
"c1"
);
ObjectMapper mapper = TestHelper.makeJsonMapper();

View File

@ -161,7 +161,8 @@ public class CliIndexer extends ServerRunnable
node.getHostAndPortToUse(),
config.getIp(),
config.getCapacity(),
config.getVersion()
config.getVersion(),
WorkerConfig.DEFAULT_CATEGORY
);
}
@ -172,7 +173,8 @@ public class CliIndexer extends ServerRunnable
return new WorkerNodeService(
workerConfig.getIp(),
workerConfig.getCapacity(),
workerConfig.getVersion()
workerConfig.getVersion(),
WorkerConfig.DEFAULT_CATEGORY
);
}
},

View File

@ -156,7 +156,8 @@ public class CliMiddleManager extends ServerRunnable
node.getHostAndPortToUse(),
config.getIp(),
config.getCapacity(),
config.getVersion()
config.getVersion(),
config.getCategory()
);
}
@ -167,7 +168,8 @@ public class CliMiddleManager extends ServerRunnable
return new WorkerNodeService(
workerConfig.getIp(),
workerConfig.getCapacity(),
workerConfig.getVersion()
workerConfig.getVersion(),
workerConfig.getCategory()
);
}
},

View File

@ -1630,6 +1630,16 @@ v0.12.0
versionReplacementString
workerId
yyyy-MM-dd
taskType
index_kafka
c1
c2
ds1
equalDistributionWithCategorySpec
fillCapacityWithCategorySpec
WorkerCategorySpec
workerCategorySpec
CategoryConfig
- ../docs/design/index.md
logsearch
- ../docs/ingestion/index.md