diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index b7774a9b419..30f678c5bac 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -20,6 +20,7 @@ package com.metamx.druid.indexing.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Maps; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -66,7 +67,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -254,17 +254,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider throw new ISE("Must start RTR first before calling bootstrap!"); } - Set existingTasks = Sets.newHashSet(); + Map existingTasks = Maps.newHashMap(); for (ZkWorker zkWorker : zkWorkers.values()) { - existingTasks.addAll(zkWorker.getRunningTasks().keySet()); + for (String runningTask : zkWorker.getRunningTasks().keySet()) { + existingTasks.put(runningTask, zkWorker.getWorker()); + } } for (Task task : tasks) { - if (existingTasks.contains(task.getId())) { - log.info("Bootstrap found %s running.", task.getId()); + Worker worker = existingTasks.get(task.getId()); + if (worker != null) { + log.info("Bootstrap found [%s] running on [%s].", task.getId(), worker.getHost()); runningTasks.put( task.getId(), - new RemoteTaskRunnerWorkItem(task, SettableFuture.create()) + new RemoteTaskRunnerWorkItem( + task, SettableFuture.create(), + worker + ) ); } } @@ -308,7 +314,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( task, - SettableFuture.create() + SettableFuture.create(), + null ); addPendingTask(taskRunnerWorkItem); return taskRunnerWorkItem.getResult(); @@ -454,6 +461,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider */ private void cleanup(final String workerId, final String taskId) { + log.info("Cleaning up [%s]", taskId); runningTasks.remove(taskId); final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId); try { @@ -529,8 +537,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider return; } - runningTasks.put(task.getId(), workItem.withWorker(theWorker)); - log.info("Task %s switched from pending to running", task.getId()); + RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theWorker); + runningTasks.put(task.getId(), newWorkItem); + log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost()); // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // on a worker - this avoids overflowing a worker with tasks @@ -619,7 +628,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider break; case CHILD_REMOVED: taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - taskRunnerWorkItem = runningTasks.get(taskId); + taskRunnerWorkItem = runningTasks.remove(taskId); if (taskRunnerWorkItem != null) { log.info("Task[%s] just disappeared!", taskId); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); @@ -672,7 +681,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size()); for (Map.Entry entry : runningTasks.entrySet()) { - if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) { + if (entry.getValue() == null) { + log.error("Huh? null work item for [%s]", entry.getKey()); + } else if (entry.getValue().getWorker() == null) { + log.error("Huh? no worker for [%s]", entry.getKey()); + } else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) { log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey()); tasksToFail.add(entry.getKey()); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java index be60c758ab6..e7d9d82e676 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java @@ -34,12 +34,13 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem public RemoteTaskRunnerWorkItem( Task task, - SettableFuture result + SettableFuture result, + Worker worker ) { super(task, result); this.result = result; - this.worker = null; + this.worker = worker; } public RemoteTaskRunnerWorkItem( @@ -71,8 +72,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker); } - public RemoteTaskRunnerWorkItem withWorker(Worker worker) + public RemoteTaskRunnerWorkItem withWorker(Worker theWorker) { - return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker); + return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), theWorker); } } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index 07d8346f65e..26366a6f681 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -140,7 +140,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -168,7 +168,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -184,7 +184,7 @@ public class SimpleResourceManagementStrategyTest provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -225,7 +225,7 @@ public class SimpleResourceManagementStrategyTest boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -243,7 +243,7 @@ public class SimpleResourceManagementStrategyTest provisionedSomething = simpleResourceManagementStrategy.doProvision( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -277,7 +277,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -307,7 +307,7 @@ public class SimpleResourceManagementStrategyTest boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -322,7 +322,7 @@ public class SimpleResourceManagementStrategyTest terminatedSomething = simpleResourceManagementStrategy.doTerminate( Arrays.asList( - new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) + new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null)