Merge pull request #2472 from redBorder/master

Fixed equal distribution strategy when exist disabled middleManagers.
This commit is contained in:
Charles Allen 2016-02-17 07:29:14 -08:00
commit 5a69fe891a
3 changed files with 83 additions and 2 deletions

View File

@ -47,7 +47,17 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
)
{
return -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.
if(retVal == 0) {
retVal = zkWorker2.getWorker().getVersion().compareTo(zkWorker.getWorker().getVersion());
}
return retVal;
}
}
);

View File

@ -50,8 +50,13 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.
if (retVal == 0) {
retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost());
retVal = zkWorker.getWorker().getVersion().compareTo(zkWorker2.getWorker().getVersion());
}
return retVal;

View File

@ -63,4 +63,70 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@Test
public void testOneDisableWorkerDifferentUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
@Test
public void testOneDisableWorkerSameUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
}