diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 15d7db56084..35fe72db968 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -45,6 +45,7 @@ import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener; +import com.netflix.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; @@ -383,45 +384,71 @@ public class RemoteTaskRunner implements TaskRunner public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { synchronized (statusLock) { - String taskId = null; try { - if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { - TaskStatus taskStatus = jsonMapper.readValue( - event.getData().getData(), TaskStatus.class - ); - log.info("New status[%s] appeared!", taskStatus.getId()); - statusLock.notify(); - } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { - String statusPath = event.getData().getPath(); - TaskStatus taskStatus = jsonMapper.readValue( - event.getData().getData(), TaskStatus.class - ); - taskId = taskStatus.getId(); + if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || + event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) + { + final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + final TaskStatus taskStatus; - log.info("Task[%s] updated status[%s]!", taskId, taskStatus.getStatusCode()); + // This can fail if a worker writes a bogus status. Retry if so. + try { + taskStatus = jsonMapper.readValue( + event.getData().getData(), TaskStatus.class + ); + + if(!taskStatus.getId().equals(taskId)) { + // Sanity check + throw new ISE( + "Worker[%s] status id does not match payload id: %s != %s", + worker.getHost(), + taskId, + taskStatus.getId() + ); + } + } catch (Exception e) { + log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId); + retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); + throw Throwables.propagate(e); + } + + log.info( + "Worker[%s] wrote %s status for task: %s", + worker.getHost(), + taskStatus.getStatusCode(), + taskId + ); + + statusLock.notify(); if (taskStatus.isComplete()) { + // Worker is done with this task workerWrapper.setLastCompletedTaskTime(new DateTime()); - TaskWrapper taskWrapper = tasks.get(taskId); + final TaskWrapper taskWrapper = tasks.get(taskId); if (taskWrapper == null) { - log.warn("A task completed that I didn't know about? WTF?!"); + log.warn( + "WTF?! Worker[%s] completed a task I didn't know about: %s", + worker.getHost(), + taskId + ); } else { - TaskCallback callback = taskWrapper.getCallback(); + final TaskCallback callback = taskWrapper.getCallback(); // Cleanup if (callback != null) { callback.notify(taskStatus); } tasks.remove(taskId); - cf.delete().guaranteed().inBackground().forPath(statusPath); + cf.delete().guaranteed().inBackground().forPath(event.getData().getPath()); } } } - } - catch (Exception e) { - log.error(e, "Exception in status listener"); - retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId)); + } catch(Exception e) { + log.makeAlert(e, "Failed to handle new worker status") + .addData("worker", worker.getHost()) + .addData("znode", event.getData().getPath()) + .emit(); } } }