Collapse worker select strategies, change default, add strong affinity. (#4534)

* Collapse worker select strategies, change default, add strong affinity.

- Change default worker select strategy to equalDistribution. It is
  more generally useful than fillCapacity.
- Collapse the *WithAffinity strategies into the regular ones. The
  *WithAffinity strategies are retained for backwards compatibility.
- Change WorkerSelectStrategy to return nullable instead of Optional.
- Fix a couple of errors in the docs.

* Fix test.

* Review adjustments.

* Remove unused imports.

* Switch to DateTimes.nowUtc.

* Simplify code.

* Fix tests (worker assignment started off on a different foot)
This commit is contained in:
Gian Merlino 2017-09-04 14:40:55 -07:00 committed by GitHub
parent c7b8116b3a
commit 33c0928bed
21 changed files with 574 additions and 384 deletions

View File

@ -156,7 +156,7 @@ A sample worker config spec is shown below:
```json
{
"selectStrategy": {
"type": "fillCapacityWithAffinity",
"type": "fillCapacity",
"affinityConfig": {
"affinity": {
"datasource1": ["host1:port", "host2:port"],
@ -193,7 +193,7 @@ Issuing a GET request at the same URL will return the current worker config spec
|Property|Description|Default|
|--------|-----------|-------|
|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|equalDistribution|
|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `equalDistribution`, and `javascript`.|equalDistribution|
|`autoScaler`|Only used if autoscaling is enabled. See below.|null|
To view the audit history of worker config issue a GET request to the URL -
@ -212,48 +212,31 @@ http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker/history?count=<n>
#### Worker Select Strategy
##### Fill Capacity
Workers are assigned tasks until capacity.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`fillCapacity`.|required; must be `fillCapacity`|
Note that, if `druid.indexer.runner.pendingTasksRunnerNumThreads` is set to n (> 1) then it means to fill n workers upto capacity simultaneously and then moving on.
##### Fill Capacity With Affinity
An affinity config can be provided.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`fillCapacityWithAffinity`.|required; must be `fillCapacityWithAffinity`|
|`affinity`|JSON object mapping a datasource String name to a list of indexing service middle manager host:port String values. Druid doesn't perform DNS resolution, so the 'host' value must match what is configured on the middle manager and what the middle manager announces itself as (examine the Overlord logs to see what your middle manager announces itself as).|{}|
Tasks will try to be assigned to preferred workers. Fill capacity strategy is used if no preference for a datasource specified.
Note that, if `druid.indexer.runner.pendingTasksRunnerNumThreads` is set to n (> 1) then it means to fill n preferred workers upto capacity simultaneously and then moving on.
Worker select strategies control how Druid assigns tasks to middleManagers.
##### Equal Distribution
The workers with the least amount of tasks is assigned the task.
Tasks are assigned to the middleManager with the most available capacity at the time the task begins running. This is
useful if you want work evenly distributed across your middleManagers.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistribution`.|required; must be `equalDistribution`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|
##### Equal Distribution With Affinity
##### Fill Capacity
An affinity config can be provided.
Tasks are assigned to the worker with the most currently-running tasks at the time the task begins running. This is
useful in situations where you are elastically auto-scaling middleManagers, since it will tend to pack some full and
leave others empty. The empty ones can be safely terminated.
Note that if `druid.indexer.runner.pendingTasksRunnerNumThreads` is set to _N_ > 1, then this strategy will fill _N_
middleManagers up to capacity simultaneously, rather than a single middleManager.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistributionWithAffinity`.|required; must be `equalDistributionWithAffinity`|
|`affinity`|Exactly same with `fillCapacityWithAffinity` 's affinity.|{}|
Tasks will try to be assigned to preferred workers. Equal Distribution strategy is used if no preference for a datasource specified.
|`type`|`fillCapacity`.|required; must be `fillCapacity`|
|`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)|
##### Javascript
@ -263,7 +246,6 @@ It can be used for rapid development of missing features where the worker select
If the selection logic is quite complex and cannot be easily tested in javascript environment,
its better to write a druid extension module with extending current worker selection strategies written in java.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`javascript`.|required; must be `javascript`|
@ -282,6 +264,16 @@ Example: a function that sends batch_index_task to workers 10.0.0.1 and 10.0.0.2
JavaScript-based functionality is disabled by default. Please refer to the Druid <a href="../development/javascript.html">JavaScript programming guide</a> for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.
</div>
##### Affinity
Affinity configs can be provided to the _equalDistribution_ and _fillCapacity_ strategies using the "affinityConfig"
field. If not provided, the default is to not use affinity at all.
|Property|Description|Default|
|--------|-----------|-------|
|`affinity`|JSON object mapping a datasource String name to a list of indexing service middleManager host:port String values. Druid doesn't perform DNS resolution, so the 'host' value must match what is configured on the middleManager and what the middleManager announces itself as (examine the Overlord logs to see what your middleManager announces itself as).|{}|
|`strong`|With weak affinity (the default), tasks for a dataSource may be assigned to other middleManagers if their affinity-mapped middleManagers are not able to run all pending tasks in the queue for that dataSource. With strong affinity, tasks for a dataSource will only ever be assigned to their affinity-mapped middleManagers, and will wait in the pending queue if necessary.|false|
#### Autoscaler
Amazon's EC2 is currently the only supported autoscaler.

View File

@ -753,7 +753,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
ZkWorker assignedWorker = null;
Optional<ImmutableWorkerInfo> immutableZkWorker = null;
final ImmutableWorkerInfo immutableZkWorker;
try {
synchronized (workersWithUnacknowledgedTask) {
immutableZkWorker = strategy.findWorkerForTask(
@ -787,10 +787,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
task
);
if (immutableZkWorker.isPresent() &&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
if (immutableZkWorker != null &&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());
}
}

View File

@ -21,7 +21,6 @@ package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
@ -251,14 +250,14 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
// the number of additional workers needed to assign all the pending tasks is noted
for (Task task : pendingTasks) {
Optional<ImmutableWorkerInfo> selectedWorker = workerSelectStrategy.findWorkerForTask(
final ImmutableWorkerInfo selectedWorker = workerSelectStrategy.findWorkerForTask(
workerTaskRunnerConfig,
ImmutableMap.copyOf(workersMap),
task
);
final ImmutableWorkerInfo workerRunningTask;
if (selectedWorker.isPresent()) {
workerRunningTask = selectedWorker.get();
if (selectedWorker != null) {
workerRunningTask = selectedWorker;
} else {
// None of the existing worker can run this task, we need to provision one worker for it.
// create a dummy worker and try to simulate assigning task to it.

View File

@ -21,34 +21,55 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
*/
public class AffinityConfig
{
// key:Datasource, value:[nodeHostNames]
private Map<String, List<String>> affinity = Maps.newHashMap();
private final Map<String, Set<String>> affinity;
private final boolean strong;
// Cache of the names of workers that have affinity for any dataSource.
// Not part of the serialized JSON or equals/hashCode.
private final Set<String> affinityWorkers;
@JsonCreator
public AffinityConfig(
@JsonProperty("affinity") Map<String, List<String>> affinity
@JsonProperty("affinity") Map<String, Set<String>> affinity,
@JsonProperty("strong") boolean strong
)
{
this.affinity = affinity;
this.strong = strong;
this.affinityWorkers = affinity.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
}
@JsonProperty
public Map<String, List<String>> getAffinity()
public Map<String, Set<String>> getAffinity()
{
return affinity;
}
@JsonProperty
public boolean isStrong()
{
return strong;
}
public Set<String> getAffinityWorkers()
{
return affinityWorkers;
}
@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
@ -56,21 +77,23 @@ public class AffinityConfig
if (o == null || getClass() != o.getClass()) {
return false;
}
AffinityConfig that = (AffinityConfig) o;
if (affinity != null
? !Maps.difference(affinity, that.affinity).entriesDiffering().isEmpty()
: that.affinity != null) {
return false;
}
return true;
final AffinityConfig that = (AffinityConfig) o;
return strong == that.strong &&
Objects.equals(affinity, that.affinity);
}
@Override
public int hashCode()
{
return affinity != null ? affinity.hashCode() : 0;
return Objects.hash(affinity, strong);
}
@Override
public String toString()
{
return "AffinityConfig{" +
"affinity=" + affinity +
", strong=" + strong +
'}';
}
}

View File

@ -21,104 +21,17 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.List;
import java.util.Set;
/**
* Only exists for backwards compatibility with existing "equalDistributionWithAffinity" worker configs.
*/
public class EqualDistributionWithAffinityWorkerSelectStrategy extends EqualDistributionWorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet();
@JsonCreator
public EqualDistributionWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
)
{
this.affinityConfig = affinityConfig;
for (List<String> affinityWorkers : affinityConfig.getAffinity().values()) {
for (String affinityWorker : affinityWorkers) {
this.affinityWorkerHosts.add(affinityWorker);
}
}
}
@JsonProperty
public AffinityConfig getAffinityConfig()
{
return affinityConfig;
}
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
ImmutableMap.Builder<String, ImmutableWorkerInfo> builder = new ImmutableMap.Builder<>();
for (String workerHost : zkWorkers.keySet()) {
if (!affinityWorkerHosts.contains(workerHost)) {
builder.put(workerHost, zkWorkers.get(workerHost));
}
}
ImmutableMap<String, ImmutableWorkerInfo> eligibleWorkers = builder.build();
List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
if (workerHosts == null) {
return super.findWorkerForTask(config, eligibleWorkers, task);
}
ImmutableMap.Builder<String, ImmutableWorkerInfo> affinityBuilder = new ImmutableMap.Builder<>();
for (String workerHost : workerHosts) {
ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost);
if (zkWorker != null) {
affinityBuilder.put(workerHost, zkWorker);
}
}
ImmutableMap<String, ImmutableWorkerInfo> affinityWorkers = affinityBuilder.build();
if (!affinityWorkers.isEmpty()) {
Optional<ImmutableWorkerInfo> retVal = super.findWorkerForTask(config, affinityWorkers, task);
if (retVal.isPresent()) {
return retVal;
}
}
return super.findWorkerForTask(config, eligibleWorkers, task);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EqualDistributionWithAffinityWorkerSelectStrategy that = (EqualDistributionWithAffinityWorkerSelectStrategy) o;
if (affinityConfig != null ? !affinityConfig.equals(that.affinityConfig) : that.affinityConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return affinityConfig != null ? affinityConfig.hashCode() : 0;
super(affinityConfig);
}
}

View File

@ -19,41 +19,84 @@
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.Comparator;
import java.util.TreeSet;
import java.util.Map;
import java.util.Objects;
/**
*/
public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrategy
{
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
WorkerTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
private final AffinityConfig affinityConfig;
@JsonCreator
public EqualDistributionWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
)
{
// the version sorting is needed because if the workers have the same available capacity only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.
final TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity).reversed()
.thenComparing(zkWorker -> zkWorker.getWorker().getVersion()));
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();
this.affinityConfig = affinityConfig;
}
for (ImmutableWorkerInfo zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}
@JsonProperty
public AffinityConfig getAffinityConfig()
{
return affinityConfig;
}
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
affinityConfig,
EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
);
}
private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity)
).orElse(null);
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final EqualDistributionWorkerSelectStrategy that = (EqualDistributionWorkerSelectStrategy) o;
return Objects.equals(affinityConfig, that.affinityConfig);
}
return Optional.absent();
@Override
public int hashCode()
{
return Objects.hash(affinityConfig);
}
@Override
public String toString()
{
return "EqualDistributionWorkerSelectStrategy{" +
"affinityConfig=" + affinityConfig +
'}';
}
}

View File

@ -21,105 +21,17 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.List;
import java.util.Set;
/**
* Only exists for backwards compatibility with existing "fillCapacityWithAffinity" worker configs.
*/
public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet();
@JsonCreator
public FillCapacityWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
)
{
this.affinityConfig = affinityConfig;
for (List<String> affinityWorkers : affinityConfig.getAffinity().values()) {
for (String affinityWorker : affinityWorkers) {
this.affinityWorkerHosts.add(affinityWorker);
}
}
}
@JsonProperty
public AffinityConfig getAffinityConfig()
{
return affinityConfig;
}
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
ImmutableMap.Builder<String, ImmutableWorkerInfo> builder = new ImmutableMap.Builder<>();
for (String workerHost : zkWorkers.keySet()) {
if (!affinityWorkerHosts.contains(workerHost)) {
builder.put(workerHost, zkWorkers.get(workerHost));
}
}
ImmutableMap<String, ImmutableWorkerInfo> eligibleWorkers = builder.build();
List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
if (workerHosts == null) {
return super.findWorkerForTask(config, eligibleWorkers, task);
}
ImmutableMap.Builder<String, ImmutableWorkerInfo> affinityBuilder = new ImmutableMap.Builder<>();
for (String workerHost : workerHosts) {
ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost);
if (zkWorker != null) {
affinityBuilder.put(workerHost, zkWorker);
}
}
ImmutableMap<String, ImmutableWorkerInfo> affinityWorkers = affinityBuilder.build();
if (!affinityWorkers.isEmpty()) {
Optional<ImmutableWorkerInfo> retVal = super.findWorkerForTask(config, affinityWorkers, task);
if (retVal.isPresent()) {
return retVal;
}
}
return super.findWorkerForTask(config, eligibleWorkers, task);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FillCapacityWithAffinityWorkerSelectStrategy that = (FillCapacityWithAffinityWorkerSelectStrategy) o;
if (affinityConfig != null ? !affinityConfig.equals(that.affinityConfig) : that.affinityConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return affinityConfig != null ? affinityConfig.hashCode() : 0;
super(affinityConfig);
}
}

View File

@ -19,59 +19,82 @@
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.Comparator;
import java.util.TreeSet;
import java.util.Map;
import java.util.Objects;
/**
*/
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
@JsonCreator
public FillCapacityWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
)
{
this.affinityConfig = affinityConfig;
}
@JsonProperty
public AffinityConfig getAffinityConfig()
{
return affinityConfig;
}
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableWorkerInfo>()
{
@Override
public int compare(
ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.
if (retVal == 0) {
retVal = zkWorker.getWorker().getVersion().compareTo(zkWorker2.getWorker().getVersion());
}
return retVal;
}
}
return WorkerSelectUtils.selectWorker(
task,
zkWorkers,
config,
affinityConfig,
FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
);
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();
}
for (ImmutableWorkerInfo zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}
private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map<String, ImmutableWorkerInfo> eligibleWorkers)
{
return eligibleWorkers.values().stream().max(
Comparator.comparing(ImmutableWorkerInfo::getCurrCapacityUsed)
).orElse(null);
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final FillCapacityWorkerSelectStrategy that = (FillCapacityWorkerSelectStrategy) o;
return Objects.equals(affinityConfig, that.affinityConfig);
}
return Optional.absent();
@Override
public int hashCode()
{
return Objects.hash(affinityConfig);
}
@Override
public String toString()
{
return "FillCapacityWorkerSelectStrategy{" +
"affinityConfig=" + affinityConfig +
'}';
}
}

View File

@ -22,11 +22,9 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@ -73,12 +71,12 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
}
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
public ImmutableWorkerInfo findWorkerForTask(
WorkerTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
)
{
String worker = fnSelector.apply(config, zkWorkers, task);
return Optional.fromNullable(worker == null ? null : zkWorkers.get(worker));
return worker == null ? null : zkWorkers.get(worker);
}
@JsonProperty

View File

@ -29,7 +29,7 @@ import io.druid.indexing.overlord.autoscaling.NoopAutoScaler;
public class WorkerBehaviorConfig
{
public static final String CONFIG_KEY = "worker.config";
public static WorkerSelectStrategy DEFAULT_STRATEGY = new EqualDistributionWorkerSelectStrategy();
public static WorkerSelectStrategy DEFAULT_STRATEGY = new EqualDistributionWorkerSelectStrategy(null);
public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler();
public static WorkerBehaviorConfig defaultConfig()

View File

@ -21,12 +21,13 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import javax.annotation.Nullable;
/**
* The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to.
*/
@ -49,7 +50,8 @@ public interface WorkerSelectStrategy
*
* @return A {@link ImmutableWorkerInfo} to run the task if one is available.
*/
Optional<ImmutableWorkerInfo> findWorkerForTask(
@Nullable
ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task

View File

@ -0,0 +1,118 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.setup;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public class WorkerSelectUtils
{
private WorkerSelectUtils()
{
// No instantiation.
}
/**
* Helper for {@link WorkerSelectStrategy} implementations.
*
* @param allWorkers map of all workers, in the style provided to {@link WorkerSelectStrategy}
* @param affinityConfig affinity config, or null
* @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run
* the task, and worker satisfies the affinity config. may return null.
*
* @return selected worker from "allWorkers", or null.
*/
@Nullable
public static ImmutableWorkerInfo selectWorker(
final Task task,
final Map<String, ImmutableWorkerInfo> allWorkers,
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
@Nullable final AffinityConfig affinityConfig,
final Function<ImmutableMap<String, ImmutableWorkerInfo>, ImmutableWorkerInfo> workerSelector
)
{
// Workers that could potentially run this task, ignoring affinityConfig.
final Map<String, ImmutableWorkerInfo> runnableWorkers = allWorkers
.values()
.stream()
.filter(worker -> worker.canRunTask(task)
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
if (affinityConfig == null) {
// All runnable workers are valid.
return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers));
} else {
// Workers assigned to the affinity pool for our task.
final Set<String> dataSourceWorkers = affinityConfig.getAffinity().get(task.getDataSource());
if (dataSourceWorkers == null) {
// No affinity config for this dataSource; use non-affinity workers.
return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));
} else {
// Get runnable, affinity workers.
final ImmutableMap<String, ImmutableWorkerInfo> dataSourceWorkerMap =
ImmutableMap.copyOf(Maps.filterKeys(runnableWorkers, dataSourceWorkers::contains));
final ImmutableWorkerInfo selected = workerSelector.apply(dataSourceWorkerMap);
if (selected != null) {
return selected;
} else if (affinityConfig.isStrong()) {
return null;
} else {
// Weak affinity allows us to use nonAffinityWorkers for this dataSource, if no affinity workers
// are available.
return workerSelector.apply(getNonAffinityWorkers(affinityConfig, runnableWorkers));
}
}
}
}
/**
* Return workers not assigned to any affinity pool at all.
*
* @param affinityConfig affinity config
* @param workerMap map of worker hostname to worker info
*
* @return map of worker hostname to worker info
*/
private static ImmutableMap<String, ImmutableWorkerInfo> getNonAffinityWorkers(
final AffinityConfig affinityConfig,
final Map<String, ImmutableWorkerInfo> workerMap
)
{
return ImmutableMap.copyOf(
Maps.filterKeys(
workerMap,
workerHost -> !affinityConfig.getAffinityWorkers().contains(workerHost)
)
);
}
}

View File

@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import io.druid.TestUtil;
import io.druid.guice.ServerModule;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.expression.TestExprMacroTable;
import io.druid.segment.IndexIO;
@ -41,6 +43,8 @@ import java.util.concurrent.TimeUnit;
*/
public class TestUtils
{
private static final Logger log = new Logger(TestUtil.class);
private final ObjectMapper jsonMapper;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
@ -103,11 +107,12 @@ public class TestUtils
while (!condition.isValid()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > timeout) {
throw new ISE("Cannot find running task");
throw new ISE("Condition[%s] not met", condition);
}
}
}
catch (Exception e) {
log.warn(e, "Condition[%s] not met within timeout[%,d]", condition, timeout);
return false;
}
return true;

View File

@ -627,11 +627,11 @@ public class RemoteTaskRunnerTest
makeRemoteTaskRunner(rtrConfig);
TestRealtimeTask task1 = new TestRealtimeTask(
"realtime1",
new TaskResource("realtime1", 1),
"foo",
TaskStatus.success("realtime1"),
jsonMapper
"realtime1",
new TaskResource("realtime1", 1),
"foo",
TaskStatus.success("realtime1"),
jsonMapper
);
Future<TaskStatus> taskFuture1 = remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
@ -639,15 +639,17 @@ public class RemoteTaskRunnerTest
mockWorkerCompleteFailedTask(task1);
Assert.assertTrue(taskFuture1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(1,
remoteTaskRunner.findWorkerRunningTask(task1.getId()).getContinuouslyFailedTasksCount());
Assert.assertEquals(
1,
remoteTaskRunner.findWorkerRunningTask(task1.getId()).getContinuouslyFailedTasksCount()
);
TestRealtimeTask task2 = new TestRealtimeTask(
"realtime2",
new TaskResource("realtime2", 1),
"foo",
TaskStatus.running("realtime2"),
jsonMapper
"realtime2",
new TaskResource("realtime2", 1),
"foo",
TaskStatus.running("realtime2"),
jsonMapper
);
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId()));
@ -672,15 +674,17 @@ public class RemoteTaskRunnerTest
// After backOffTime the nodes are removed from blacklist
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount());
Assert.assertEquals(
0,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount()
);
TestRealtimeTask task3 = new TestRealtimeTask(
"realtime3",
new TaskResource("realtime3", 1),
"foo",
TaskStatus.running("realtime3"),
jsonMapper
"realtime3",
new TaskResource("realtime3", 1),
"foo",
TaskStatus.running("realtime3"),
jsonMapper
);
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
@ -688,8 +692,10 @@ public class RemoteTaskRunnerTest
mockWorkerCompleteSuccessfulTask(task3);
Assert.assertTrue(taskFuture3.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0,
remoteTaskRunner.findWorkerRunningTask(task3.getId()).getContinuouslyFailedTasksCount());
Assert.assertEquals(
0,
remoteTaskRunner.findWorkerRunningTask(task3.getId()).getContinuouslyFailedTasksCount()
);
}
/**
@ -709,6 +715,9 @@ public class RemoteTaskRunnerTest
makeRemoteTaskRunner(rtrConfig);
String firstWorker = null;
String secondWorker = null;
for (int i = 1; i < 13; i++) {
String taskId = StringUtils.format("rt-%d", i);
TestRealtimeTask task = new TestRealtimeTask(
@ -717,9 +726,21 @@ public class RemoteTaskRunnerTest
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
rtrTestUtils.taskAnnounced(i % 2 == 0 ? "worker2" : "worker", task.getId());
rtrTestUtils.mockWorkerRunningTask(i % 2 == 0 ? "worker2" : "worker", task);
rtrTestUtils.mockWorkerCompleteFailedTask(i % 2 == 0 ? "worker2" : "worker", task);
if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
} else {
firstWorker = "worker";
secondWorker = "worker2";
}
}
final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;
Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(taskFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
@ -747,6 +768,9 @@ public class RemoteTaskRunnerTest
makeRemoteTaskRunner(rtrConfig);
String firstWorker = null;
String secondWorker = null;
for (int i = 1; i < 13; i++) {
String taskId = StringUtils.format("rt-%d", i);
TestRealtimeTask task = new TestRealtimeTask(
@ -755,9 +779,21 @@ public class RemoteTaskRunnerTest
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
rtrTestUtils.taskAnnounced(i % 2 == 0 || i > 4 ? "worker2" : "worker", task.getId());
rtrTestUtils.mockWorkerRunningTask(i % 2 == 0 || i > 4 ? "worker2" : "worker", task);
rtrTestUtils.mockWorkerCompleteFailedTask(i % 2 == 0 || i > 4 ? "worker2" : "worker", task);
if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
} else {
firstWorker = "worker";
secondWorker = "worker2";
}
}
final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker;
Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(taskFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size());

View File

@ -212,6 +212,12 @@ public class RemoteTaskRunnerTestUtils
throw Throwables.propagate(e);
}
}
@Override
public String toString()
{
return StringUtils.format("Path[%s] exists", path);
}
}
);
}

View File

@ -87,7 +87,7 @@ public class PendingTaskBasedProvisioningStrategyTest
workerConfig = new AtomicReference<>(
new WorkerBehaviorConfig(
new FillCapacityWorkerSelectStrategy(),
new FillCapacityWorkerSelectStrategy(null),
autoScaler
)
);

View File

@ -19,29 +19,29 @@
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.DateTimes;
import io.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class EqualDistributionWithAffinityWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost1", "localhost2", "localhost3")))
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost1", "localhost2", "localhost3")), false)
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost0",
@ -82,7 +82,6 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("localhost1", worker.getWorker().getHost());
}
@ -90,10 +89,10 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTaskWithNulls() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost")), false)
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
@ -113,7 +112,6 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
),
new NoopTask(null, 1, 0, null, null, null)
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -121,10 +119,10 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
public void testIsolation() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost")), false)
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost",
@ -137,6 +135,20 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
),
new NoopTask(null, 1, 0, null, null, null)
);
Assert.assertFalse(optional.isPresent());
Assert.assertNull(worker);
}
@Test
public void testSerde() throws Exception
{
final ObjectMapper objectMapper = TestHelper.getJsonMapper();
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost")), false)
);
final WorkerSelectStrategy strategy2 = objectMapper.readValue(
objectMapper.writeValueAsBytes(strategy),
WorkerSelectStrategy.class
);
Assert.assertEquals(strategy, strategy2);
}
}

View File

@ -19,8 +19,8 @@
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
@ -32,13 +32,44 @@ import org.junit.Test;
public class EqualDistributionWorkerSelectStrategyTest
{
private static final ImmutableMap<String, ImmutableWorkerInfo> WORKERS_FOR_AFFINITY_TESTS =
ImmutableMap.of(
"localhost0",
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", 2, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTimes.nowUtc()
),
"localhost1",
new ImmutableWorkerInfo(
new Worker("http", "localhost1", "localhost1", 2, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTimes.nowUtc()
),
"localhost2",
new ImmutableWorkerInfo(
new Worker("http", "localhost2", "localhost2", 2, "v1"), 1,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTimes.nowUtc()
),
"localhost3",
new ImmutableWorkerInfo(
new Worker("http", "localhost3", "localhost3", 2, "v1"), 1,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTimes.nowUtc()
)
);
@Test
public void testFindWorkerForTask() throws Exception
{
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(null);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
@ -65,16 +96,15 @@ public class EqualDistributionWorkerSelectStrategyTest
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWhenSameCurrCapacityUsed() throws Exception
{
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(null);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
@ -101,7 +131,6 @@ public class EqualDistributionWorkerSelectStrategyTest
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@ -109,9 +138,9 @@ public class EqualDistributionWorkerSelectStrategyTest
public void testOneDisableWorkerDifferentUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(null);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
@ -138,7 +167,6 @@ public class EqualDistributionWorkerSelectStrategyTest
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
@ -146,9 +174,9 @@ public class EqualDistributionWorkerSelectStrategyTest
public void testOneDisableWorkerSameUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(null);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
@ -175,7 +203,90 @@ public class EqualDistributionWorkerSelectStrategyTest
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
@Test
public void testWeakAffinity()
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(
new AffinityConfig(
ImmutableMap.of(
"foo", ImmutableSet.of("localhost1", "localhost2", "localhost3"),
"bar", ImmutableSet.of("nonexistent-worker")
),
false
)
);
ImmutableWorkerInfo workerFoo = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_AFFINITY_TESTS,
createDummyTask("foo")
);
Assert.assertEquals("localhost1", workerFoo.getWorker().getHost());
// With weak affinity, bar (which has no affinity workers available) can use a non-affinity worker.
ImmutableWorkerInfo workerBar = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_AFFINITY_TESTS,
createDummyTask("bar")
);
Assert.assertEquals("localhost0", workerBar.getWorker().getHost());
ImmutableWorkerInfo workerBaz = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_AFFINITY_TESTS,
createDummyTask("baz")
);
Assert.assertEquals("localhost0", workerBaz.getWorker().getHost());
}
@Test
public void testStrongAffinity()
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(
new AffinityConfig(
ImmutableMap.of(
"foo", ImmutableSet.of("localhost1", "localhost2", "localhost3"),
"bar", ImmutableSet.of("nonexistent-worker")
),
true
)
);
ImmutableWorkerInfo workerFoo = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_AFFINITY_TESTS,
createDummyTask("foo")
);
Assert.assertEquals("localhost1", workerFoo.getWorker().getHost());
// With strong affinity, no workers can be found for bar.
ImmutableWorkerInfo workerBar = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_AFFINITY_TESTS,
createDummyTask("bar")
);
Assert.assertNull(workerBar);
ImmutableWorkerInfo workerBaz = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_AFFINITY_TESTS,
createDummyTask("baz")
);
Assert.assertEquals("localhost0", workerBaz.getWorker().getHost());
}
private static NoopTask createDummyTask(final String dataSource)
{
return new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return dataSource;
}
};
}
}

View File

@ -19,8 +19,8 @@
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
@ -30,18 +30,16 @@ import io.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class FillCapacityWithAffinityWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost")), false)
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
@ -68,7 +66,6 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@ -76,10 +73,10 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTaskWithNulls() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost")), false)
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
@ -99,7 +96,6 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
),
new NoopTask(null, 1, 0, null, null, null)
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -107,10 +103,10 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testIsolation() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost")), false)
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost",
@ -123,6 +119,6 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
),
new NoopTask(null, 1, 0, null, null, null)
);
Assert.assertFalse(optional.isPresent());
Assert.assertNull(worker);
}
}

View File

@ -22,7 +22,6 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
@ -126,7 +125,7 @@ public class JavaScriptWorkerSelectStrategyTest
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
).get();
);
// batch tasks should be sent to worker1
Assert.assertEquals(worker1, workerForBatchTask);
@ -134,7 +133,7 @@ public class JavaScriptWorkerSelectStrategyTest
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("other_type")
).get();
);
// all other tasks should be sent to worker2
Assert.assertEquals(worker2, workerForOtherTask);
}
@ -146,12 +145,12 @@ public class JavaScriptWorkerSelectStrategyTest
"10.0.0.1", createMockWorker(1, true, true),
"10.0.0.2", createMockWorker(1, true, true)
);
Optional<ImmutableWorkerInfo> workerForOtherTask = STRATEGY.findWorkerForTask(
ImmutableWorkerInfo workerForOtherTask = STRATEGY.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("other_type")
);
Assert.assertFalse(workerForOtherTask.isPresent());
Assert.assertNull(workerForOtherTask);
}
@Test
@ -161,20 +160,20 @@ public class JavaScriptWorkerSelectStrategyTest
"10.0.0.1", createMockWorker(1, true, false),
"10.0.0.4", createMockWorker(1, true, false)
);
Optional<ImmutableWorkerInfo> workerForBatchTask = STRATEGY.findWorkerForTask(
ImmutableWorkerInfo workerForBatchTask = STRATEGY.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
);
Assert.assertFalse(workerForBatchTask.isPresent());
Assert.assertNull(workerForBatchTask);
Optional<ImmutableWorkerInfo> workerForOtherTask = STRATEGY.findWorkerForTask(
ImmutableWorkerInfo workerForOtherTask = STRATEGY.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("otherTask")
);
// all other tasks should be sent to worker2
Assert.assertFalse(workerForOtherTask.isPresent());
Assert.assertNull(workerForOtherTask);
}
@Test
@ -184,20 +183,20 @@ public class JavaScriptWorkerSelectStrategyTest
"10.0.0.1", createMockWorker(1, false, true),
"10.0.0.4", createMockWorker(1, false, true)
);
Optional<ImmutableWorkerInfo> workerForBatchTask = STRATEGY.findWorkerForTask(
ImmutableWorkerInfo workerForBatchTask = STRATEGY.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
);
Assert.assertFalse(workerForBatchTask.isPresent());
Assert.assertNull(workerForBatchTask);
Optional<ImmutableWorkerInfo> workerForOtherTask = STRATEGY.findWorkerForTask(
ImmutableWorkerInfo workerForOtherTask = STRATEGY.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("otherTask")
);
// all other tasks should be sent to worker2
Assert.assertFalse(workerForOtherTask.isPresent());
Assert.assertNull(workerForOtherTask);
}
@Test
@ -208,12 +207,12 @@ public class JavaScriptWorkerSelectStrategyTest
"10.0.0.1", createMockWorker(1, true, true),
"10.0.0.2", createMockWorker(5, true, true)
);
Optional<ImmutableWorkerInfo> workerForBatchTask = STRATEGY.findWorkerForTask(
ImmutableWorkerInfo workerForBatchTask = STRATEGY.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
);
Assert.assertEquals(workerMap.get("10.0.0.2"), workerForBatchTask.get());
Assert.assertEquals(workerMap.get("10.0.0.2"), workerForBatchTask);
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
import io.druid.indexing.overlord.autoscaling.ec2.EC2EnvironmentConfig;
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
@ -42,7 +43,8 @@ public class WorkerBehaviorConfigTest
WorkerBehaviorConfig config = new WorkerBehaviorConfig(
new FillCapacityWithAffinityWorkerSelectStrategy(
new AffinityConfig(
ImmutableMap.of("foo", Arrays.asList("localhost"))
ImmutableMap.of("foo", ImmutableSet.of("localhost")),
false
)
),
new EC2AutoScaler(