From 09fcfc3b6dd514e70f2e002782e3a6c2d7b49ea6 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 18 Jul 2014 11:37:53 -0700 Subject: [PATCH] Fix race in RemoteTaskRunner that could lead to zombie tasks. --- .../indexing/overlord/RemoteTaskRunner.java | 36 +++++++++---------- .../overlord/RemoteTaskRunnerWorkItem.java | 20 ++++++++++- .../SimpleResourceManagementStrategyTest.java | 28 +++++++-------- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 7b0dab4d512..89cc2ce9d73 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -422,11 +422,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private RemoteTaskRunnerWorkItem addPendingTask(final Task task) { log.info("Added pending task %s", task.getId()); - final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( - task.getId(), - SettableFuture.create(), - null - ); + final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null); pendingTaskPayloads.put(task.getId(), task); pendingTasks.put(task.getId(), taskRunnerWorkItem); runPendingTasks(); @@ -663,17 +659,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer if ((tmp = runningTasks.get(taskId)) != null) { taskRunnerWorkItem = tmp; } else { - log.warn( - "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", - zkWorker.getWorker().getHost(), - taskId - ); - taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( + final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( taskId, - SettableFuture.create(), zkWorker.getWorker() ); - runningTasks.put(taskId, taskRunnerWorkItem.withWorker(zkWorker.getWorker())); + final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( + taskId, + newTaskRunnerWorkItem + ); + if (existingItem == null) { + log.warn( + "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", + zkWorker.getWorker().getHost(), + taskId + ); + taskRunnerWorkItem = newTaskRunnerWorkItem; + } else { + taskRunnerWorkItem = existingItem; + } } if (taskStatus.isComplete()) { @@ -835,7 +838,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer // Worker is done with this task zkWorker.setLastCompletedTaskTime(new DateTime()); } else { - log.info("No worker run task[%s] with status[%s]", taskStatus.getId(), taskStatus.getStatusCode()); + log.info("Workerless task[%s] completed with status[%s]", taskStatus.getId(), taskStatus.getStatusCode()); } // Move from running -> complete @@ -843,9 +846,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer runningTasks.remove(taskStatus.getId()); // Notify interested parties - final ListenableFuture result = taskRunnerWorkItem.getResult(); - if (result != null) { - ((SettableFuture) result).set(taskStatus); - } + taskRunnerWorkItem.setResult(taskStatus); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java index 76d373a049a..86b3e583c26 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerWorkItem.java @@ -32,6 +32,24 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem private final Worker worker; public RemoteTaskRunnerWorkItem( + String taskId, + Worker worker + ) + { + this(taskId, SettableFuture.create(), worker); + } + + public RemoteTaskRunnerWorkItem( + String taskId, + DateTime createdTime, + DateTime queueInsertionTime, + Worker worker + ) + { + this(taskId, SettableFuture.create(), createdTime, queueInsertionTime, worker); + } + + private RemoteTaskRunnerWorkItem( String taskId, SettableFuture result, Worker worker @@ -42,7 +60,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem this.worker = worker; } - public RemoteTaskRunnerWorkItem( + private RemoteTaskRunnerWorkItem( String taskId, SettableFuture result, DateTime createdTime, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java index 6c13a0704c0..86021525496 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java @@ -27,8 +27,8 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; import io.druid.common.guava.DSuppliers; -import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; @@ -113,7 +113,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -141,7 +141,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -157,7 +157,7 @@ public class SimpleResourceManagementStrategyTest provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -188,7 +188,7 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList()).times(2); EasyMock.expect(autoScalingStrategy.terminateWithIds(EasyMock.>anyObject())) - .andReturn(null); + .andReturn(null); EasyMock.expect(autoScalingStrategy.provision()).andReturn( new AutoScalingData(Lists.newArrayList("fake")) ); @@ -196,7 +196,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -214,7 +214,7 @@ public class SimpleResourceManagementStrategyTest provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -248,7 +248,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -278,7 +278,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -293,7 +293,7 @@ public class SimpleResourceManagementStrategyTest terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -319,7 +319,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(NoopTask.create()), @@ -337,7 +337,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(NoopTask.create()), @@ -412,7 +412,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -421,7 +421,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null)