diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java index 9d4026004f7..67ac2967149 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -74,6 +74,10 @@ public class ImmutableWorkerInfo return availabilityGroups; } + public int getAvailableCapacity() { + return getWorker().getCapacity() - getCurrCapacityUsed(); + } + @JsonProperty("runningTasks") public Set getRunningTasks() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java index ef0a41c2f63..becc8053b54 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java @@ -22,7 +22,6 @@ package io.druid.indexing.overlord.setup; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; -import com.google.common.primitives.Ints; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; @@ -39,28 +38,13 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate WorkerTaskRunnerConfig config, ImmutableMap zkWorkers, Task task ) { + // the version sorting is needed because if the workers have the same available capacity only one of them is + // returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't + // run new tasks, so in this case the workers are sorted using version to ensure that if exists enable + // workers the comparator return one of them. final TreeSet sortedWorkers = Sets.newTreeSet( - new Comparator() - { - @Override - public int compare( - ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2 - ) - { - int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); - // the version sorting is needed because if the workers have the same currCapacityUsed only one of them is - // returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't - // run new tasks, so in this case the workers are sorted using version to ensure that if exists enable - // workers the comparator return one of them. - - if(retVal == 0) { - retVal = zkWorker2.getWorker().getVersion().compareTo(zkWorker.getWorker().getVersion()); - } - - return retVal; - } - } - ); + Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity).reversed() + .thenComparing(zkWorker -> zkWorker.getWorker().getVersion())); sortedWorkers.addAll(zkWorkers.values()); final String minWorkerVer = config.getMinWorkerVersion(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java index e53dc25d27a..aad319300ed 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java @@ -69,6 +69,42 @@ public class EqualDistributionWorkerSelectStrategyTest Assert.assertEquals("lhost", worker.getWorker().getHost()); } + @Test + public void testFindWorkerForTaskWhenSameCurrCapacityUsed() throws Exception + { + final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(); + + Optional optional = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + ImmutableMap.of( + "lhost", + new ImmutableWorkerInfo( + new Worker("lhost", "lhost", 5, "v1"), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ), + "localhost", + new ImmutableWorkerInfo( + new Worker("localhost", "localhost", 10, "v1"), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ) + ), + new NoopTask(null, 1, 0, null, null, null) + { + @Override + public String getDataSource() + { + return "foo"; + } + } + ); + ImmutableWorkerInfo worker = optional.get(); + Assert.assertEquals("localhost", worker.getWorker().getHost()); + } + @Test public void testOneDisableWorkerDifferentUsedCapacity() throws Exception { @@ -78,28 +114,28 @@ public class EqualDistributionWorkerSelectStrategyTest Optional optional = strategy.findWorkerForTask( new RemoteTaskRunnerConfig(), ImmutableMap.of( - "lhost", - new ImmutableWorkerInfo( - new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2, - Sets.newHashSet(), - Sets.newHashSet(), - DateTime.now() - ), - "localhost", - new ImmutableWorkerInfo( - new Worker("enableHost", "enableHost", 10, "v1"), 5, - Sets.newHashSet(), - Sets.newHashSet(), - DateTime.now() - ) + "lhost", + new ImmutableWorkerInfo( + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ), + "localhost", + new ImmutableWorkerInfo( + new Worker("enableHost", "enableHost", 10, "v1"), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ) ), new NoopTask(null, 1, 0, null, null, null) { @Override public String getDataSource() - { - return "foo"; - } + { + return "foo"; + } } ); ImmutableWorkerInfo worker = optional.get(); @@ -113,31 +149,31 @@ public class EqualDistributionWorkerSelectStrategyTest final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(); Optional optional = strategy.findWorkerForTask( - new RemoteTaskRunnerConfig(), - ImmutableMap.of( - "lhost", - new ImmutableWorkerInfo( - new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5, - Sets.newHashSet(), - Sets.newHashSet(), - DateTime.now() - ), - "localhost", - new ImmutableWorkerInfo( - new Worker("enableHost", "enableHost", 10, "v1"), 5, - Sets.newHashSet(), - Sets.newHashSet(), - DateTime.now() - ) + new RemoteTaskRunnerConfig(), + ImmutableMap.of( + "lhost", + new ImmutableWorkerInfo( + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ), - new NoopTask(null, 1, 0, null, null, null) - { - @Override - public String getDataSource() - { - return "foo"; - } - } + "localhost", + new ImmutableWorkerInfo( + new Worker("enableHost", "enableHost", 10, "v1"), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ) + ), + new NoopTask(null, 1, 0, null, null, null) + { + @Override + public String getDataSource() + { + return "foo"; + } + } ); ImmutableWorkerInfo worker = optional.get(); Assert.assertEquals("enableHost", worker.getWorker().getHost());