mirror of https://github.com/apache/druid.git
Merge pull request #43 from metamx/rtr-listener
RemoteTaskRunner: More robust task status listener
This commit is contained in:
commit
f664364770
|
@ -45,6 +45,7 @@ import com.netflix.curator.framework.CuratorFramework;
|
||||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
||||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
|
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||||
|
import com.netflix.curator.utils.ZKPaths;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
@ -383,45 +384,71 @@ public class RemoteTaskRunner implements TaskRunner
|
||||||
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
|
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
|
||||||
{
|
{
|
||||||
synchronized (statusLock) {
|
synchronized (statusLock) {
|
||||||
String taskId = null;
|
|
||||||
try {
|
try {
|
||||||
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
|
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
|
||||||
TaskStatus taskStatus = jsonMapper.readValue(
|
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))
|
||||||
event.getData().getData(), TaskStatus.class
|
{
|
||||||
);
|
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
|
||||||
log.info("New status[%s] appeared!", taskStatus.getId());
|
final TaskStatus taskStatus;
|
||||||
statusLock.notify();
|
|
||||||
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
|
|
||||||
String statusPath = event.getData().getPath();
|
|
||||||
TaskStatus taskStatus = jsonMapper.readValue(
|
|
||||||
event.getData().getData(), TaskStatus.class
|
|
||||||
);
|
|
||||||
taskId = taskStatus.getId();
|
|
||||||
|
|
||||||
log.info("Task[%s] updated status[%s]!", taskId, taskStatus.getStatusCode());
|
// This can fail if a worker writes a bogus status. Retry if so.
|
||||||
|
try {
|
||||||
|
taskStatus = jsonMapper.readValue(
|
||||||
|
event.getData().getData(), TaskStatus.class
|
||||||
|
);
|
||||||
|
|
||||||
|
if(!taskStatus.getId().equals(taskId)) {
|
||||||
|
// Sanity check
|
||||||
|
throw new ISE(
|
||||||
|
"Worker[%s] status id does not match payload id: %s != %s",
|
||||||
|
worker.getHost(),
|
||||||
|
taskId,
|
||||||
|
taskStatus.getId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
|
||||||
|
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Worker[%s] wrote %s status for task: %s",
|
||||||
|
worker.getHost(),
|
||||||
|
taskStatus.getStatusCode(),
|
||||||
|
taskId
|
||||||
|
);
|
||||||
|
|
||||||
|
statusLock.notify();
|
||||||
|
|
||||||
if (taskStatus.isComplete()) {
|
if (taskStatus.isComplete()) {
|
||||||
|
// Worker is done with this task
|
||||||
workerWrapper.setLastCompletedTaskTime(new DateTime());
|
workerWrapper.setLastCompletedTaskTime(new DateTime());
|
||||||
TaskWrapper taskWrapper = tasks.get(taskId);
|
final TaskWrapper taskWrapper = tasks.get(taskId);
|
||||||
|
|
||||||
if (taskWrapper == null) {
|
if (taskWrapper == null) {
|
||||||
log.warn("A task completed that I didn't know about? WTF?!");
|
log.warn(
|
||||||
|
"WTF?! Worker[%s] completed a task I didn't know about: %s",
|
||||||
|
worker.getHost(),
|
||||||
|
taskId
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
TaskCallback callback = taskWrapper.getCallback();
|
final TaskCallback callback = taskWrapper.getCallback();
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.notify(taskStatus);
|
callback.notify(taskStatus);
|
||||||
}
|
}
|
||||||
tasks.remove(taskId);
|
tasks.remove(taskId);
|
||||||
cf.delete().guaranteed().inBackground().forPath(statusPath);
|
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} catch(Exception e) {
|
||||||
catch (Exception e) {
|
log.makeAlert(e, "Failed to handle new worker status")
|
||||||
log.error(e, "Exception in status listener");
|
.addData("worker", worker.getHost())
|
||||||
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
|
.addData("znode", event.getData().getPath())
|
||||||
|
.emit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue