RemoteTaskRunner: More robust task status listener

- Handle CHILD_ADDED and CHILD_UPDATED together, to cover the case where a worker
  reconnects to zk in the middle of a task (which makes the ephemeral status node
  disappear and come back as an addition)
- There's no need to retry on most exceptions that can occur here, so don't
This commit is contained in:
Gian Merlino 2012-12-12 12:50:23 -08:00
parent f7a6ac31e3
commit 091ed069f5
1 changed files with 49 additions and 22 deletions

View File

@ -45,6 +45,7 @@ import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
@ -383,45 +384,71 @@ public class RemoteTaskRunner implements TaskRunner
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
synchronized (statusLock) {
String taskId = null;
try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
TaskStatus taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
);
log.info("New status[%s] appeared!", taskStatus.getId());
statusLock.notify();
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
String statusPath = event.getData().getPath();
TaskStatus taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
);
taskId = taskStatus.getId();
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))
{
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus;
log.info("Task[%s] updated status[%s]!", taskId, taskStatus.getStatusCode());
// This can fail if a worker writes a bogus status. Retry if so.
try {
taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
);
if(!taskStatus.getId().equals(taskId)) {
// Sanity check
throw new ISE(
"Worker[%s] status id does not match payload id: %s != %s",
worker.getHost(),
taskId,
taskStatus.getId()
);
}
} catch (Exception e) {
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
throw Throwables.propagate(e);
}
log.info(
"Worker[%s] wrote %s status for task: %s",
worker.getHost(),
taskStatus.getStatusCode(),
taskId
);
statusLock.notify();
if (taskStatus.isComplete()) {
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
TaskWrapper taskWrapper = tasks.get(taskId);
final TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper == null) {
log.warn("A task completed that I didn't know about? WTF?!");
log.warn(
"WTF?! Worker[%s] completed a task I didn't know about: %s",
worker.getHost(),
taskId
);
} else {
TaskCallback callback = taskWrapper.getCallback();
final TaskCallback callback = taskWrapper.getCallback();
// Cleanup
if (callback != null) {
callback.notify(taskStatus);
}
tasks.remove(taskId);
cf.delete().guaranteed().inBackground().forPath(statusPath);
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
}
}
}
}
catch (Exception e) {
log.error(e, "Exception in status listener");
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
} catch(Exception e) {
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", worker.getHost())
.addData("znode", event.getData().getPath())
.emit();
}
}
}