more logs for RTR

This commit is contained in:
fjy 2013-08-21 21:47:59 -07:00
parent b1d0f989ec
commit d92ab8bb58
3 changed files with 37 additions and 23 deletions

View File

@ -20,6 +20,7 @@
package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Maps;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@ -66,7 +67,6 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@ -254,17 +254,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
throw new ISE("Must start RTR first before calling bootstrap!");
}
Set<String> existingTasks = Sets.newHashSet();
Map<String, Worker> existingTasks = Maps.newHashMap();
for (ZkWorker zkWorker : zkWorkers.values()) {
existingTasks.addAll(zkWorker.getRunningTasks().keySet());
for (String runningTask : zkWorker.getRunningTasks().keySet()) {
existingTasks.put(runningTask, zkWorker.getWorker());
}
}
for (Task task : tasks) {
if (existingTasks.contains(task.getId())) {
log.info("Bootstrap found %s running.", task.getId());
Worker worker = existingTasks.get(task.getId());
if (worker != null) {
log.info("Bootstrap found [%s] running on [%s].", task.getId(), worker.getHost());
runningTasks.put(
task.getId(),
new RemoteTaskRunnerWorkItem(task, SettableFuture.<TaskStatus>create())
new RemoteTaskRunnerWorkItem(
task, SettableFuture.<TaskStatus>create(),
worker
)
);
}
}
@ -308,7 +314,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
task,
SettableFuture.<TaskStatus>create()
SettableFuture.<TaskStatus>create(),
null
);
addPendingTask(taskRunnerWorkItem);
return taskRunnerWorkItem.getResult();
@ -454,6 +461,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
*/
private void cleanup(final String workerId, final String taskId)
{
log.info("Cleaning up [%s]", taskId);
runningTasks.remove(taskId);
final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId);
try {
@ -529,8 +537,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
return;
}
runningTasks.put(task.getId(), workItem.withWorker(theWorker));
log.info("Task %s switched from pending to running", task.getId());
RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theWorker);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
@ -619,7 +628,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
break;
case CHILD_REMOVED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.get(taskId);
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
@ -672,7 +681,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
if (entry.getValue() == null) {
log.error("Huh? null work item for [%s]", entry.getKey());
} else if (entry.getValue().getWorker() == null) {
log.error("Huh? no worker for [%s]", entry.getKey());
} else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
tasksToFail.add(entry.getKey());
}

View File

@ -34,12 +34,13 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result
SettableFuture<TaskStatus> result,
Worker worker
)
{
super(task, result);
this.result = result;
this.worker = null;
this.worker = worker;
}
public RemoteTaskRunnerWorkItem(
@ -71,8 +72,8 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker);
}
public RemoteTaskRunnerWorkItem withWorker(Worker worker)
public RemoteTaskRunnerWorkItem withWorker(Worker theWorker)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker);
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), theWorker);
}
}

View File

@ -140,7 +140,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -168,7 +168,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -184,7 +184,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -225,7 +225,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -243,7 +243,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -277,7 +277,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -307,7 +307,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -322,7 +322,7 @@ public class SimpleResourceManagementStrategyTest
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)