fix equalDistribution worker select strategy (#4318)

* fix equalDistribution worker select strategy

* replace anonymous Comparator

* keep previous version sorting comment

* fix code style

* update comment

* move JsonProperty
This commit is contained in:
chaoqiang 2017-05-25 12:30:42 +08:00 committed by Gian Merlino
parent fe42db98ac
commit 5fc4abcf71
3 changed files with 87 additions and 63 deletions

View File

@ -74,6 +74,10 @@ public class ImmutableWorkerInfo
return availabilityGroups;
}
public int getAvailableCapacity() {
return getWorker().getCapacity() - getCurrCapacityUsed();
}
@JsonProperty("runningTasks")
public Set<String> getRunningTasks()
{

View File

@ -22,7 +22,6 @@ package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@ -39,28 +38,13 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate
WorkerTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
)
{
final TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableWorkerInfo>()
{
@Override
public int compare(
ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2
)
{
int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// the version sorting is needed because if the workers have the same available capacity only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.
if(retVal == 0) {
retVal = zkWorker2.getWorker().getVersion().compareTo(zkWorker.getWorker().getVersion());
}
return retVal;
}
}
);
final TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity).reversed()
.thenComparing(zkWorker -> zkWorker.getWorker().getVersion()));
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();

View File

@ -69,6 +69,42 @@ public class EqualDistributionWorkerSelectStrategyTest
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWhenSameCurrCapacityUsed() throws Exception
{
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("lhost", "lhost", 5, "v1"), 5,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 10, "v1"), 5,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@Test
public void testOneDisableWorkerDifferentUsedCapacity() throws Exception
{