fix the race described in #3174 (#3205)

This commit is contained in:
Himanshu 2016-08-10 13:29:50 -05:00 committed by Fangjin Yang
parent 043562914d
commit 03cfcf002b
3 changed files with 39 additions and 42 deletions

View File

@ -82,7 +82,7 @@ The following configs only apply if the overlord is running in remote mode:
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|
|`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M|
|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|3|
|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|1|
There are additional configs for autoscaling (if it is enabled):
@ -186,6 +186,8 @@ Workers are assigned tasks until capacity.
|--------|-----------|-------|
|`type`|`fillCapacity`.|required; must be `fillCapacity`|
Note that, if `druid.indexer.runner.pendingTasksRunnerNumThreads` is set to n (> 1) then it means to fill n workers upto capacity simultaneously and then moving on.
##### Fill Capacity With Affinity
An affinity config can be provided.
@ -197,6 +199,8 @@ An affinity config can be provided.
Tasks will try to be assigned to preferred workers. Fill capacity strategy is used if no preference for a datasource specified.
Note that, if `druid.indexer.runner.pendingTasksRunnerNumThreads` is set to n (> 1) then it means to fill n preferred workers upto capacity simultaneously and then moving on.
##### Equal Distribution
The workers with the least amount of tasks is assigned the task.

View File

@ -705,49 +705,46 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
ZkWorker assignedWorker = null;
Optional<ImmutableWorkerInfo> immutableZkWorker = null;
try {
immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Maps.filterEntries(
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
{
@Override
public boolean apply(Map.Entry<String, ZkWorker> input)
synchronized (workersWithUnacknowledgedTask) {
immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Maps.filterEntries(
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
{
return !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey());
@Override
public boolean apply(Map.Entry<String, ZkWorker> input)
{
return !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey());
}
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableWorkerInfo transformEntry(
String key, ZkWorker value
)
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
{
return value.toImmutable();
@Override
public ImmutableWorkerInfo transformEntry(
String key, ZkWorker value
)
{
return value.toImmutable();
}
}
}
)
),
task
);
)
),
task
);
if (immutableZkWorker.isPresent()) {
if (workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
== null) {
if (immutableZkWorker.isPresent() &&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
return announceTask(task, assignedWorker, taskRunnerWorkItem);
} else {
log.debug(
"Lost race to run task [%s] on worker [%s]. Workers to ack tasks are [%s].",
task.getId(),
immutableZkWorker.get().getWorker().getHost(),
workersWithUnacknowledgedTask
);
}
}
if (assignedWorker != null) {
return announceTask(task, assignedWorker, taskRunnerWorkItem);
} else {
log.debug(
"Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",
@ -762,10 +759,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
}
if(immutableZkWorker.isPresent()) {
//if this attempt lost the race to run the task then there might be another worker available to try on.
//if this attempt won the race to run the task then other task might be able to use this worker now after task ack.
runPendingTasks();
}

View File

@ -47,7 +47,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
@JsonProperty
@Min(1)
private int pendingTasksRunnerNumThreads = 3;
private int pendingTasksRunnerNumThreads = 1;
public Period getTaskAssignmentTimeout()
{