Merge pull request #644 from metamx/rtr-bugs

Fix race in RemoteTaskRunner that could lead to zombie tasks.
This commit is contained in:
fjy 2014-07-18 12:44:50 -06:00
commit 1fe82c50f1
3 changed files with 51 additions and 33 deletions

View File

@ -422,11 +422,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private RemoteTaskRunnerWorkItem addPendingTask(final Task task) private RemoteTaskRunnerWorkItem addPendingTask(final Task task)
{ {
log.info("Added pending task %s", task.getId()); log.info("Added pending task %s", task.getId());
final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null);
task.getId(),
SettableFuture.<TaskStatus>create(),
null
);
pendingTaskPayloads.put(task.getId(), task); pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem); pendingTasks.put(task.getId(), taskRunnerWorkItem);
runPendingTasks(); runPendingTasks();
@ -663,17 +659,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
if ((tmp = runningTasks.get(taskId)) != null) { if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp; taskRunnerWorkItem = tmp;
} else { } else {
log.warn( final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId, taskId,
SettableFuture.<TaskStatus>create(),
zkWorker.getWorker() zkWorker.getWorker()
); );
runningTasks.put(taskId, taskRunnerWorkItem.withWorker(zkWorker.getWorker())); final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
} }
if (taskStatus.isComplete()) { if (taskStatus.isComplete()) {
@ -835,7 +838,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
// Worker is done with this task // Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime()); zkWorker.setLastCompletedTaskTime(new DateTime());
} else { } else {
log.info("No worker run task[%s] with status[%s]", taskStatus.getId(), taskStatus.getStatusCode()); log.info("Workerless task[%s] completed with status[%s]", taskStatus.getId(), taskStatus.getStatusCode());
} }
// Move from running -> complete // Move from running -> complete
@ -843,9 +846,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
runningTasks.remove(taskStatus.getId()); runningTasks.remove(taskStatus.getId());
// Notify interested parties // Notify interested parties
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult(); taskRunnerWorkItem.setResult(taskStatus);
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
} }
} }

View File

@ -32,6 +32,24 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
private final Worker worker; private final Worker worker;
public RemoteTaskRunnerWorkItem( public RemoteTaskRunnerWorkItem(
String taskId,
Worker worker
)
{
this(taskId, SettableFuture.<TaskStatus>create(), worker);
}
public RemoteTaskRunnerWorkItem(
String taskId,
DateTime createdTime,
DateTime queueInsertionTime,
Worker worker
)
{
this(taskId, SettableFuture.<TaskStatus>create(), createdTime, queueInsertionTime, worker);
}
private RemoteTaskRunnerWorkItem(
String taskId, String taskId,
SettableFuture<TaskStatus> result, SettableFuture<TaskStatus> result,
Worker worker Worker worker
@ -42,7 +60,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
this.worker = worker; this.worker = worker;
} }
public RemoteTaskRunnerWorkItem( private RemoteTaskRunnerWorkItem(
String taskId, String taskId,
SettableFuture<TaskStatus> result, SettableFuture<TaskStatus> result,
DateTime createdTime, DateTime createdTime,

View File

@ -27,8 +27,8 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers; import io.druid.common.guava.DSuppliers;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
@ -113,7 +113,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -141,7 +141,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -157,7 +157,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -188,7 +188,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2); .andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.terminateWithIds(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.terminateWithIds(EasyMock.<List<String>>anyObject()))
.andReturn(null); .andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake")) new AutoScalingData(Lists.<String>newArrayList("fake"))
); );
@ -196,7 +196,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -214,7 +214,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -248,7 +248,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -278,7 +278,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -293,7 +293,7 @@ public class SimpleResourceManagementStrategyTest
terminatedSomething = simpleResourceManagementStrategy.doTerminate( terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -319,7 +319,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()), new TestZkWorker(NoopTask.create()),
@ -337,7 +337,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()), new TestZkWorker(NoopTask.create()),
@ -412,7 +412,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -421,7 +421,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)