diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 9739bc63aac..2d00c140514 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -183,7 +183,7 @@ Issuing a GET request at the same URL will return the current worker config spec |Property|Description|Default| |--------|-----------|-------| -|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution` and `javascript`.|fillCapacity| +|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|fillCapacity| |`autoScaler`|Only used if autoscaling is enabled. See below.|null| To view the audit history of worker config issue a GET request to the URL - @@ -233,6 +233,18 @@ The workers with the least amount of tasks is assigned the task. |--------|-----------|-------| |`type`|`equalDistribution`.|required; must be `equalDistribution`| +##### Equal Distribution With Affinity + +An affinity config can be provided. + +|Property|Description|Default| +|--------|-----------|-------| +|`type`|`equalDistributionWithAffinity`.|required; must be `equalDistributionWithAffinity`| +|`affinity`|Exactly same with `fillCapacityWithAffinity` 's affinity.|{}| + +Tasks will try to be assigned to preferred workers. Equal Distribution strategy is used if no preference for a datasource specified. + + ##### Javascript Allows defining arbitrary logic for selecting workers to run task using a JavaScript function. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/AffinityConfig.java similarity index 91% rename from indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityConfig.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/setup/AffinityConfig.java index 0a22cbe5098..7d06bcd6940 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/AffinityConfig.java @@ -28,13 +28,13 @@ import java.util.Map; /** */ -public class FillCapacityWithAffinityConfig +public class AffinityConfig { // key:Datasource, value:[nodeHostNames] private Map> affinity = Maps.newHashMap(); @JsonCreator - public FillCapacityWithAffinityConfig( + public AffinityConfig( @JsonProperty("affinity") Map> affinity ) { @@ -57,7 +57,7 @@ public class FillCapacityWithAffinityConfig return false; } - FillCapacityWithAffinityConfig that = (FillCapacityWithAffinityConfig) o; + AffinityConfig that = (AffinityConfig) o; if (affinity != null ? !Maps.difference(affinity, that.affinity).entriesDiffering().isEmpty() diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java new file mode 100644 index 00000000000..17b7b81228c --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java @@ -0,0 +1,124 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableWorkerInfo; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; + +import java.util.List; +import java.util.Set; + +/** + */ +public class EqualDistributionWithAffinityWorkerSelectStrategy extends EqualDistributionWorkerSelectStrategy +{ + private final AffinityConfig affinityConfig; + private final Set affinityWorkerHosts = Sets.newHashSet(); + + @JsonCreator + public EqualDistributionWithAffinityWorkerSelectStrategy( + @JsonProperty("affinityConfig") AffinityConfig affinityConfig + ) + { + this.affinityConfig = affinityConfig; + for (List affinityWorkers : affinityConfig.getAffinity().values()) { + for (String affinityWorker : affinityWorkers) { + this.affinityWorkerHosts.add(affinityWorker); + } + } + } + + @JsonProperty + public AffinityConfig getAffinityConfig() + { + return affinityConfig; + } + + @Override + public Optional findWorkerForTask( + final WorkerTaskRunnerConfig config, + final ImmutableMap zkWorkers, + final Task task + ) + { + // don't run other datasources on affinity workers; we only want our configured datasources to run on them + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + for (String workerHost : zkWorkers.keySet()) { + if (!affinityWorkerHosts.contains(workerHost)) { + builder.put(workerHost, zkWorkers.get(workerHost)); + } + } + ImmutableMap eligibleWorkers = builder.build(); + + List workerHosts = affinityConfig.getAffinity().get(task.getDataSource()); + if (workerHosts == null) { + return super.findWorkerForTask(config, eligibleWorkers, task); + } + + ImmutableMap.Builder affinityBuilder = new ImmutableMap.Builder<>(); + for (String workerHost : workerHosts) { + ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost); + if (zkWorker != null) { + affinityBuilder.put(workerHost, zkWorker); + } + } + ImmutableMap affinityWorkers = affinityBuilder.build(); + + if (!affinityWorkers.isEmpty()) { + Optional retVal = super.findWorkerForTask(config, affinityWorkers, task); + if (retVal.isPresent()) { + return retVal; + } + } + + return super.findWorkerForTask(config, eligibleWorkers, task); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + EqualDistributionWithAffinityWorkerSelectStrategy that = (EqualDistributionWithAffinityWorkerSelectStrategy) o; + + if (affinityConfig != null ? !affinityConfig.equals(that.affinityConfig) : that.affinityConfig != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return affinityConfig != null ? affinityConfig.hashCode() : 0; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java index 3bab3e1b208..2b01bb70268 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java @@ -35,12 +35,12 @@ import java.util.Set; */ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy { - private final FillCapacityWithAffinityConfig affinityConfig; + private final AffinityConfig affinityConfig; private final Set affinityWorkerHosts = Sets.newHashSet(); @JsonCreator public FillCapacityWithAffinityWorkerSelectStrategy( - @JsonProperty("affinityConfig") FillCapacityWithAffinityConfig affinityConfig + @JsonProperty("affinityConfig") AffinityConfig affinityConfig ) { this.affinityConfig = affinityConfig; @@ -52,7 +52,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo } @JsonProperty - public FillCapacityWithAffinityConfig getAffinityConfig() + public AffinityConfig getAffinityConfig() { return affinityConfig; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index e42ae55425a..87fcfb69c5b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -35,6 +35,7 @@ import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; @JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class), + @JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class) }) public interface WorkerSelectStrategy diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java new file mode 100644 index 00000000000..1e576d3f7da --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.setup; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.overlord.ImmutableWorkerInfo; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.worker.Worker; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class EqualDistributionWithAffinityWorkerSelectStrategyTest +{ + @Test + public void testFindWorkerForTask() throws Exception + { + EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy( + new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost1", "localhost2", "localhost3"))) + ); + + Optional optional = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + ImmutableMap.of( + "localhost0", + new ImmutableWorkerInfo( + new Worker("localhost0", "localhost0", 2, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ), + "localhost1", + new ImmutableWorkerInfo( + new Worker("localhost1", "localhost1", 2, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ), + "localhost2", + new ImmutableWorkerInfo( + new Worker("localhost2", "localhost2", 2, "v1"), 1, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ), + "localhost3", + new ImmutableWorkerInfo( + new Worker("localhost3", "localhost3", 2, "v1"), 1, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ) + ), + new NoopTask(null, 1, 0, null, null, null) + { + @Override + public String getDataSource() + { + return "foo"; + } + } + ); + ImmutableWorkerInfo worker = optional.get(); + Assert.assertEquals("localhost1", worker.getWorker().getHost()); + } + + @Test + public void testFindWorkerForTaskWithNulls() throws Exception + { + EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy( + new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) + ); + + Optional optional = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + ImmutableMap.of( + "lhost", + new ImmutableWorkerInfo( + new Worker("lhost", "lhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ), + "localhost", + new ImmutableWorkerInfo( + new Worker("localhost", "localhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ) + ), + new NoopTask(null, 1, 0, null, null, null) + ); + ImmutableWorkerInfo worker = optional.get(); + Assert.assertEquals("lhost", worker.getWorker().getHost()); + } + + @Test + public void testIsolation() throws Exception + { + EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy( + new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) + ); + + Optional optional = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + ImmutableMap.of( + "localhost", + new ImmutableWorkerInfo( + new Worker("localhost", "localhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ) + ), + new NoopTask(null, 1, 0, null, null, null) + ); + Assert.assertFalse(optional.isPresent()); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java index bd928d73b48..d113e37d80d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java @@ -38,7 +38,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest public void testFindWorkerForTask() throws Exception { FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( - new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) + new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) ); Optional optional = strategy.findWorkerForTask( @@ -76,7 +76,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest public void testFindWorkerForTaskWithNulls() throws Exception { FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( - new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) + new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) ); Optional optional = strategy.findWorkerForTask( @@ -107,7 +107,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest public void testIsolation() throws Exception { FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy( - new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) + new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) ); Optional optional = strategy.findWorkerForTask( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java index 705727fa865..d92fcfb66ec 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java @@ -41,7 +41,7 @@ public class WorkerBehaviorConfigTest { WorkerBehaviorConfig config = new WorkerBehaviorConfig( new FillCapacityWithAffinityWorkerSelectStrategy( - new FillCapacityWithAffinityConfig( + new AffinityConfig( ImmutableMap.of("foo", Arrays.asList("localhost")) ) ),