Add config to limit task slots for parallel indexing tasks (#12221)

In extreme cases where many parallel indexing jobs are submitted together, it is possible
that the `ParallelIndexSupervisorTasks` take up all slots leaving no slot to schedule
their own sub-tasks thus stalling progress of all the indexing jobs.

Key changes:
- Add config `druid.indexer.runner.parallelIndexTaskSlotRatio` to limit the task slots
  for `ParallelIndexSupervisorTasks` per worker
- `ratio = 1` implies supervisor tasks can use all slots on a worker if needed (default behavior)
- `ratio = 0` implies supervisor tasks can not use any slot on a worker
   (actually, at least 1 slot is always available to ensure progress of parallel indexing jobs)
- `ImmutableWorkerInfo.canRunTask()`
- `WorkerHolder`, `ZkWorker`, `WorkerSelectUtils`
This commit is contained in:
AmatyaAvadhanula 2022-02-15 23:15:09 +05:30 committed by GitHub
parent 47153cd7bd
commit 393e9b68a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 170 additions and 5 deletions

View File

@ -1066,6 +1066,7 @@ The following configs only apply if the Overlord is running in remote mode. For
|--------|-----------|-------| |--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a MiddleManager before throwing an error.|PT5M| |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a MiddleManager before throwing an error.|PT5M|
|`druid.indexer.runner.minWorkerVersion`|The minimum MiddleManager version to send tasks to. |"0"| |`druid.indexer.runner.minWorkerVersion`|The minimum MiddleManager version to send tasks to. |"0"|
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range [0, 1]. |1|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect MiddleManagers to compress Znodes.|true| |`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect MiddleManagers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper, should be in the range of [10KiB, 2GiB). [Human-readable format](human-readable-byte.md) is supported.| 512 KiB | |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper, should be in the range of [10KiB, 2GiB). [Human-readable format](human-readable-byte.md) is supported.| 512 KiB |
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a MiddleManager is disconnected from Zookeeper.|PT15M| |`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a MiddleManager is disconnected from Zookeeper.|PT15M|

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.Worker;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -39,6 +40,7 @@ public class ImmutableWorkerInfo
{ {
private final Worker worker; private final Worker worker;
private final int currCapacityUsed; private final int currCapacityUsed;
private final int currParallelIndexCapacityUsed;
private final ImmutableSet<String> availabilityGroups; private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks; private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime; private final DateTime lastCompletedTaskTime;
@ -48,6 +50,7 @@ public class ImmutableWorkerInfo
public ImmutableWorkerInfo( public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker, @JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed, @JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed,
@JsonProperty("availabilityGroups") Set<String> availabilityGroups, @JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks, @JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime, @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime,
@ -56,12 +59,26 @@ public class ImmutableWorkerInfo
{ {
this.worker = worker; this.worker = worker;
this.currCapacityUsed = currCapacityUsed; this.currCapacityUsed = currCapacityUsed;
this.currParallelIndexCapacityUsed = currParallelIndexCapacityUsed;
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks); this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime; this.lastCompletedTaskTime = lastCompletedTaskTime;
this.blacklistedUntil = blacklistedUntil; this.blacklistedUntil = blacklistedUntil;
} }
public ImmutableWorkerInfo(
Worker worker,
int currCapacityUsed,
int currParallelIndexCapacityUsed,
Set<String> availabilityGroups,
Collection<String> runningTasks,
DateTime lastCompletedTaskTime
)
{
this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups,
runningTasks, lastCompletedTaskTime, null);
}
public ImmutableWorkerInfo( public ImmutableWorkerInfo(
Worker worker, Worker worker,
int currCapacityUsed, int currCapacityUsed,
@ -70,7 +87,7 @@ public class ImmutableWorkerInfo
DateTime lastCompletedTaskTime DateTime lastCompletedTaskTime
) )
{ {
this(worker, currCapacityUsed, availabilityGroups, runningTasks, lastCompletedTaskTime, null); this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
} }
@JsonProperty("worker") @JsonProperty("worker")
@ -85,6 +102,12 @@ public class ImmutableWorkerInfo
return currCapacityUsed; return currCapacityUsed;
} }
@JsonProperty("currParallelIndexCapacityUsed")
public int getCurrParallelIndexCapacityUsed()
{
return currParallelIndexCapacityUsed;
}
@JsonProperty("availabilityGroups") @JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups() public Set<String> getAvailabilityGroups()
{ {
@ -119,12 +142,36 @@ public class ImmutableWorkerInfo
return worker.getVersion().compareTo(minVersion) >= 0; return worker.getVersion().compareTo(minVersion) >= 0;
} }
public boolean canRunTask(Task task) public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)
{ {
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
&& canRunParallelIndexTask(task, parallelIndexTaskSlotRatio)
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
} }
private boolean canRunParallelIndexTask(Task task, double parallelIndexTaskSlotRatio)
{
if (!task.getType().equals(ParallelIndexSupervisorTask.TYPE)) {
return true;
}
return getWorkerParallelIndexCapacity(parallelIndexTaskSlotRatio) - getCurrParallelIndexCapacityUsed()
>= task.getTaskResource().getRequiredCapacity();
}
private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio)
{
int totalCapacity = worker.getCapacity();
int workerParallelIndexCapacity = (int) Math.floor(parallelIndexTaskSlotRatio * totalCapacity);
if (workerParallelIndexCapacity < 1) {
workerParallelIndexCapacity = 1;
}
if (workerParallelIndexCapacity > totalCapacity) {
workerParallelIndexCapacity = totalCapacity;
}
return workerParallelIndexCapacity;
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -140,6 +187,9 @@ public class ImmutableWorkerInfo
if (currCapacityUsed != that.currCapacityUsed) { if (currCapacityUsed != that.currCapacityUsed) {
return false; return false;
} }
if (currParallelIndexCapacityUsed != that.currParallelIndexCapacityUsed) {
return false;
}
if (!worker.equals(that.worker)) { if (!worker.equals(that.worker)) {
return false; return false;
} }
@ -162,6 +212,7 @@ public class ImmutableWorkerInfo
{ {
int result = worker.hashCode(); int result = worker.hashCode();
result = 31 * result + currCapacityUsed; result = 31 * result + currCapacityUsed;
result = 31 * result + currParallelIndexCapacityUsed;
result = 31 * result + availabilityGroups.hashCode(); result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode(); result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode(); result = 31 * result + lastCompletedTaskTime.hashCode();
@ -175,6 +226,7 @@ public class ImmutableWorkerInfo
return "ImmutableWorkerInfo{" + return "ImmutableWorkerInfo{" +
"worker=" + worker + "worker=" + worker +
", currCapacityUsed=" + currCapacityUsed + ", currCapacityUsed=" + currCapacityUsed +
", currParallelIndexCapacityUsed=" + currParallelIndexCapacityUsed +
", availabilityGroups=" + availabilityGroups + ", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks + ", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime + ", lastCompletedTaskTime=" + lastCompletedTaskTime +

View File

@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.druid.annotations.UsedInGeneratedCode; import org.apache.druid.annotations.UsedInGeneratedCode;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -110,6 +111,18 @@ public class ZkWorker implements Closeable
return currCapacity; return currCapacity;
} }
@JsonProperty("currParallelIndexCapacityUsed")
public int getCurrParallelIndexCapacityUsed()
{
int currParallelIndexCapacityUsed = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
}
return currParallelIndexCapacityUsed;
}
@JsonProperty("availabilityGroups") @JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups() public Set<String> getAvailabilityGroups()
{ {
@ -168,6 +181,7 @@ public class ZkWorker implements Closeable
return new ImmutableWorkerInfo( return new ImmutableWorkerInfo(
worker.get(), worker.get(),
getCurrCapacityUsed(), getCurrCapacityUsed(),
getCurrParallelIndexCapacityUsed(),
getAvailabilityGroups(), getAvailabilityGroups(),
getRunningTaskIds(), getRunningTaskIds(),
lastCompletedTaskTime.get(), lastCompletedTaskTime.get(),

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@ -477,9 +478,13 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task) private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task)
{ {
int parallelIndexTaskCapacity = task.getType().equals(ParallelIndexSupervisorTask.TYPE)
? task.getTaskResource().getRequiredCapacity()
: 0;
return new ImmutableWorkerInfo( return new ImmutableWorkerInfo(
immutableWorker.getWorker(), immutableWorker.getWorker(),
immutableWorker.getCurrCapacityUsed() + 1, immutableWorker.getCurrCapacityUsed() + 1,
immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity,
Sets.union( Sets.union(
immutableWorker.getAvailabilityGroups(), immutableWorker.getAvailabilityGroups(),
Sets.newHashSet( Sets.newHashSet(

View File

@ -26,8 +26,26 @@ public class WorkerTaskRunnerConfig
@JsonProperty @JsonProperty
private String minWorkerVersion = "0"; private String minWorkerVersion = "0";
@JsonProperty
private double parallelIndexTaskSlotRatio = 1;
public String getMinWorkerVersion() public String getMinWorkerVersion()
{ {
return minWorkerVersion; return minWorkerVersion;
} }
/**
* The number of task slots that a parallel indexing task can take is restricted using this config as a multiplier
*
* A value of 1 means no restriction on the number of slots ParallelIndexSupervisorTasks can occupy (default behaviour)
* A value of 0 means ParallelIndexSupervisorTasks can occupy no slots.
* Deadlocks can occur if the all task slots are occupied by ParallelIndexSupervisorTasks,
* as no subtask would ever get a slot. Set this config to a value < 1 to prevent deadlocks.
*
* @return ratio of task slots available to a parallel indexing task at a worker level
*/
public double getParallelIndexTaskSlotRatio()
{
return parallelIndexTaskSlotRatio;
}
} }

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerUtils; import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
@ -149,6 +150,17 @@ public class WorkerHolder
return currCapacity; return currCapacity;
} }
private int getCurrParallelIndexCapcityUsed()
{
int currParallelIndexCapacityUsed = 0;
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
}
return currParallelIndexCapacityUsed;
}
private Set<String> getAvailabilityGroups() private Set<String> getAvailabilityGroups()
{ {
Set<String> retVal = new HashSet<>(); Set<String> retVal = new HashSet<>();
@ -193,6 +205,7 @@ public class WorkerHolder
return new ImmutableWorkerInfo( return new ImmutableWorkerInfo(
w, w,
getCurrCapacityUsed(), getCurrCapacityUsed(),
getCurrParallelIndexCapcityUsed(),
getAvailabilityGroups(), getAvailabilityGroups(),
getRunningTasks().keySet(), getRunningTasks().keySet(),
lastCompletedTaskTime.get(), lastCompletedTaskTime.get(),

View File

@ -150,7 +150,7 @@ public class WorkerSelectUtils
{ {
return allWorkers.values() return allWorkers.values()
.stream() .stream()
.filter(worker -> worker.canRunTask(task) .filter(worker -> worker.canRunTask(task, workerTaskRunnerConfig.getParallelIndexTaskSlotRatio())
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion())) && worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity())); .collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
} }

View File

@ -22,6 +22,10 @@ package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
@ -29,6 +33,9 @@ import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ImmutableWorkerInfoTest public class ImmutableWorkerInfoTest
{ {
@Test @Test
@ -193,6 +200,7 @@ public class ImmutableWorkerInfoTest
"http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
), ),
3, 3,
0,
ImmutableSet.of("grp1", "grp2"), ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"), ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z"), DateTimes.of("2015-01-01T01:01:01Z"),
@ -202,6 +210,7 @@ public class ImmutableWorkerInfoTest
"http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
), ),
2, 2,
0,
ImmutableSet.of("grp1", "grp2"), ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"), ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:02Z"), DateTimes.of("2015-01-01T01:01:02Z"),
@ -209,6 +218,57 @@ public class ImmutableWorkerInfoTest
), false); ), false);
} }
@Test
public void test_canRunTask()
{
ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo(
new Worker("http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY),
6,
0,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:02Z")
);
// Parallel index task
TaskResource taskResource0 = mock(TaskResource.class);
when(taskResource0.getRequiredCapacity()).thenReturn(3);
Task parallelIndexTask = mock(ParallelIndexSupervisorTask.class);
when(parallelIndexTask.getType()).thenReturn(ParallelIndexSupervisorTask.TYPE);
when(parallelIndexTask.getTaskResource()).thenReturn(taskResource0);
// Since task satisifies parallel and total slot constraints, can run
Assert.assertTrue(workerInfo.canRunTask(parallelIndexTask, 0.5));
// Since task fails the parallel slot constraint, it cannot run (3 > 1)
Assert.assertFalse(workerInfo.canRunTask(parallelIndexTask, 0.1));
// Some other indexing task
TaskResource taskResource1 = mock(TaskResource.class);
when(taskResource1.getRequiredCapacity()).thenReturn(5);
Task anyOtherTask = mock(IndexTask.class);
when(anyOtherTask.getType()).thenReturn("index");
when(anyOtherTask.getTaskResource()).thenReturn(taskResource1);
// Not a parallel index task -> satisfies parallel index constraint
// But does not satisfy the total slot constraint and cannot run (11 > 10)
Assert.assertFalse(workerInfo.canRunTask(anyOtherTask, 0.5));
// Task has an availability conflict ("grp1")
TaskResource taskResource2 = mock(TaskResource.class);
when(taskResource2.getRequiredCapacity()).thenReturn(1);
when(taskResource2.getAvailabilityGroup()).thenReturn("grp1");
Task grp1Task = mock(IndexTask.class);
when(grp1Task.getType()).thenReturn("blah");
when(grp1Task.getTaskResource()).thenReturn(taskResource2);
// Satisifies parallel index and total index slot constraints but cannot run due availability
Assert.assertFalse(workerInfo.canRunTask(grp1Task, 0.3));
}
private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch) private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch)
{ {
if (shouldMatch) { if (shouldMatch) {

View File

@ -57,10 +57,11 @@ public class JavaScriptWorkerSelectStrategyTest
+ "}\n" + "}\n"
+ "Array.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\n" + "Array.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\n"
+ "var minWorkerVer = config.getMinWorkerVersion();\n" + "var minWorkerVer = config.getMinWorkerVersion();\n"
+ "var parallelIndexTaskSlotRatio = config.getParallelIndexTaskSlotRatio();\n"
+ "for (var i = 0; i < sortedWorkers.length; i++) {\n" + "for (var i = 0; i < sortedWorkers.length; i++) {\n"
+ " var worker = sortedWorkers[i];\n" + " var worker = sortedWorkers[i];\n"
+ " var zkWorker = zkWorkers.get(worker);\n" + " var zkWorker = zkWorkers.get(worker);\n"
+ " if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {\n" + " if (zkWorker.canRunTask(task, parallelIndexTaskSlotRatio) && zkWorker.isValidVersion(minWorkerVer)) {\n"
+ " if (task.getType() == 'index_hadoop' && batch_workers.contains(worker)) {\n" + " if (task.getType() == 'index_hadoop' && batch_workers.contains(worker)) {\n"
+ " return worker;\n" + " return worker;\n"
+ " } else {\n" + " } else {\n"
@ -238,8 +239,9 @@ public class JavaScriptWorkerSelectStrategyTest
private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion) private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
{ {
ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class); ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class);
EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes(); EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class), EasyMock.anyDouble())).andReturn(canRunTask).anyTimes();
EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes(); EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes();
EasyMock.expect(worker.getCurrParallelIndexCapacityUsed()).andReturn(0).anyTimes();
EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes(); EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes();
EasyMock.replay(worker); EasyMock.replay(worker);
return worker; return worker;