Add EqualDistributionWithAffinityWorkerSelectStrategy which balance w… (#3998)

* Add EqualDistributionWithAffinityWorkerSelectStrategy which balance work load within affinity workers.

* add docs to equalDistributionWithAffinity
This commit is contained in:
JackyWoo 2017-03-26 10:15:49 +08:00 committed by Gian Merlino
parent 90f9932bd3
commit a0f2cf05d5
8 changed files with 290 additions and 11 deletions

View File

@ -183,7 +183,7 @@ Issuing a GET request at the same URL will return the current worker config spec
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution` and `javascript`.|fillCapacity| |`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|fillCapacity|
|`autoScaler`|Only used if autoscaling is enabled. See below.|null| |`autoScaler`|Only used if autoscaling is enabled. See below.|null|
To view the audit history of worker config issue a GET request to the URL - To view the audit history of worker config issue a GET request to the URL -
@ -233,6 +233,18 @@ The workers with the least amount of tasks is assigned the task.
|--------|-----------|-------| |--------|-----------|-------|
|`type`|`equalDistribution`.|required; must be `equalDistribution`| |`type`|`equalDistribution`.|required; must be `equalDistribution`|
##### Equal Distribution With Affinity
An affinity config can be provided.
|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistributionWithAffinity`.|required; must be `equalDistributionWithAffinity`|
|`affinity`|Exactly same with `fillCapacityWithAffinity` 's affinity.|{}|
Tasks will try to be assigned to preferred workers. Equal Distribution strategy is used if no preference for a datasource specified.
##### Javascript ##### Javascript
Allows defining arbitrary logic for selecting workers to run task using a JavaScript function. Allows defining arbitrary logic for selecting workers to run task using a JavaScript function.

View File

@ -28,13 +28,13 @@ import java.util.Map;
/** /**
*/ */
public class FillCapacityWithAffinityConfig public class AffinityConfig
{ {
// key:Datasource, value:[nodeHostNames] // key:Datasource, value:[nodeHostNames]
private Map<String, List<String>> affinity = Maps.newHashMap(); private Map<String, List<String>> affinity = Maps.newHashMap();
@JsonCreator @JsonCreator
public FillCapacityWithAffinityConfig( public AffinityConfig(
@JsonProperty("affinity") Map<String, List<String>> affinity @JsonProperty("affinity") Map<String, List<String>> affinity
) )
{ {
@ -57,7 +57,7 @@ public class FillCapacityWithAffinityConfig
return false; return false;
} }
FillCapacityWithAffinityConfig that = (FillCapacityWithAffinityConfig) o; AffinityConfig that = (AffinityConfig) o;
if (affinity != null if (affinity != null
? !Maps.difference(affinity, that.affinity).entriesDiffering().isEmpty() ? !Maps.difference(affinity, that.affinity).entriesDiffering().isEmpty()

View File

@ -0,0 +1,124 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.List;
import java.util.Set;
/**
*/
public class EqualDistributionWithAffinityWorkerSelectStrategy extends EqualDistributionWorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet();
@JsonCreator
public EqualDistributionWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
)
{
this.affinityConfig = affinityConfig;
for (List<String> affinityWorkers : affinityConfig.getAffinity().values()) {
for (String affinityWorker : affinityWorkers) {
this.affinityWorkerHosts.add(affinityWorker);
}
}
}
@JsonProperty
public AffinityConfig getAffinityConfig()
{
return affinityConfig;
}
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
ImmutableMap.Builder<String, ImmutableWorkerInfo> builder = new ImmutableMap.Builder<>();
for (String workerHost : zkWorkers.keySet()) {
if (!affinityWorkerHosts.contains(workerHost)) {
builder.put(workerHost, zkWorkers.get(workerHost));
}
}
ImmutableMap<String, ImmutableWorkerInfo> eligibleWorkers = builder.build();
List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
if (workerHosts == null) {
return super.findWorkerForTask(config, eligibleWorkers, task);
}
ImmutableMap.Builder<String, ImmutableWorkerInfo> affinityBuilder = new ImmutableMap.Builder<>();
for (String workerHost : workerHosts) {
ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost);
if (zkWorker != null) {
affinityBuilder.put(workerHost, zkWorker);
}
}
ImmutableMap<String, ImmutableWorkerInfo> affinityWorkers = affinityBuilder.build();
if (!affinityWorkers.isEmpty()) {
Optional<ImmutableWorkerInfo> retVal = super.findWorkerForTask(config, affinityWorkers, task);
if (retVal.isPresent()) {
return retVal;
}
}
return super.findWorkerForTask(config, eligibleWorkers, task);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EqualDistributionWithAffinityWorkerSelectStrategy that = (EqualDistributionWithAffinityWorkerSelectStrategy) o;
if (affinityConfig != null ? !affinityConfig.equals(that.affinityConfig) : that.affinityConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return affinityConfig != null ? affinityConfig.hashCode() : 0;
}
}

View File

@ -35,12 +35,12 @@ import java.util.Set;
*/ */
public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
{ {
private final FillCapacityWithAffinityConfig affinityConfig; private final AffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet(); private final Set<String> affinityWorkerHosts = Sets.newHashSet();
@JsonCreator @JsonCreator
public FillCapacityWithAffinityWorkerSelectStrategy( public FillCapacityWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") FillCapacityWithAffinityConfig affinityConfig @JsonProperty("affinityConfig") AffinityConfig affinityConfig
) )
{ {
this.affinityConfig = affinityConfig; this.affinityConfig = affinityConfig;
@ -52,7 +52,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
} }
@JsonProperty @JsonProperty
public FillCapacityWithAffinityConfig getAffinityConfig() public AffinityConfig getAffinityConfig()
{ {
return affinityConfig; return affinityConfig;
} }

View File

@ -35,6 +35,7 @@ import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class) @JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class)
}) })
public interface WorkerSelectStrategy public interface WorkerSelectStrategy

View File

@ -0,0 +1,142 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class EqualDistributionWithAffinityWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost1", "localhost2", "localhost3")))
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost0",
new ImmutableWorkerInfo(
new Worker("localhost0", "localhost0", 2, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost1",
new ImmutableWorkerInfo(
new Worker("localhost1", "localhost1", 2, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost2",
new ImmutableWorkerInfo(
new Worker("localhost2", "localhost2", 2, "v1"), 1,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost3",
new ImmutableWorkerInfo(
new Worker("localhost3", "localhost3", 2, "v1"), 1,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("localhost1", worker.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWithNulls() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@Test
public void testIsolation() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost",
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
);
Assert.assertFalse(optional.isPresent());
}
}

View File

@ -38,7 +38,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTask() throws Exception public void testFindWorkerForTask() throws Exception
{ {
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
); );
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask( Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
@ -76,7 +76,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTaskWithNulls() throws Exception public void testFindWorkerForTaskWithNulls() throws Exception
{ {
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
); );
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask( Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
@ -107,7 +107,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testIsolation() throws Exception public void testIsolation() throws Exception
{ {
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
); );
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask( Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(

View File

@ -41,7 +41,7 @@ public class WorkerBehaviorConfigTest
{ {
WorkerBehaviorConfig config = new WorkerBehaviorConfig( WorkerBehaviorConfig config = new WorkerBehaviorConfig(
new FillCapacityWithAffinityWorkerSelectStrategy( new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig( new AffinityConfig(
ImmutableMap.of("foo", Arrays.asList("localhost")) ImmutableMap.of("foo", Arrays.asList("localhost"))
) )
), ),