cleaning up some code for RTR

This commit is contained in:
Fangjin Yang 2013-03-02 08:58:22 -08:00
parent 879b2475d4
commit e5cc6d241d
1 changed files with 9 additions and 25 deletions

View File

@ -405,37 +405,18 @@ public class RemoteTaskRunner implements TaskRunner
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{ {
log.info("Event: %s", event.getType());
if (event.getData() != null) {
log.info("Data: %s", event.getData().getPath());
}
try { try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus; final TaskStatus taskStatus = jsonMapper.readValue(
// This can fail if a worker writes a bogus status. Retry if so.
try {
taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class event.getData().getData(), TaskStatus.class
); );
// This can fail if a worker writes a bogus status. Retry if so.
if (!taskStatus.getId().equals(taskId)) { if (!taskStatus.getId().equals(taskId)) {
// Sanity check
throw new ISE(
"Worker[%s] status id does not match payload id: %s != %s",
worker.getHost(),
taskId,
taskStatus.getId()
);
}
}
catch (Exception e) {
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
retryTask(runningTasks.get(taskId), worker.getHost()); retryTask(runningTasks.get(taskId), worker.getHost());
throw Throwables.propagate(e); return;
} }
log.info( log.info(
@ -526,7 +507,10 @@ public class RemoteTaskRunner implements TaskRunner
for (String taskId : tasksToRetry) { for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
cf.delete().guaranteed().forPath(JOINER.join(config.getTaskPath(), worker.getHost(), taskId)); String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), taskId);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
retryTask(taskRunnerWorkItem, worker.getHost()); retryTask(taskRunnerWorkItem, worker.getHost());
} else { } else {
log.warn("RemoteTaskRunner has no knowledge of task %s", taskId); log.warn("RemoteTaskRunner has no knowledge of task %s", taskId);