diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 42e70abef08..b344700a51a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -405,37 +405,18 @@ public class RemoteTaskRunner implements TaskRunner @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - log.info("Event: %s", event.getType()); - if (event.getData() != null) { - log.info("Data: %s", event.getData().getPath()); - } try { if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { - final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - final TaskStatus taskStatus; + final TaskStatus taskStatus = jsonMapper.readValue( + event.getData().getData(), TaskStatus.class + ); // This can fail if a worker writes a bogus status. Retry if so. - try { - taskStatus = jsonMapper.readValue( - event.getData().getData(), TaskStatus.class - ); - - if (!taskStatus.getId().equals(taskId)) { - // Sanity check - throw new ISE( - "Worker[%s] status id does not match payload id: %s != %s", - worker.getHost(), - taskId, - taskStatus.getId() - ); - } - } - catch (Exception e) { - log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId); + if (!taskStatus.getId().equals(taskId)) { retryTask(runningTasks.get(taskId), worker.getHost()); - throw Throwables.propagate(e); + return; } log.info( @@ -526,7 +507,10 @@ public class RemoteTaskRunner implements TaskRunner for (String taskId : tasksToRetry) { TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); if (taskRunnerWorkItem != null) { - cf.delete().guaranteed().forPath(JOINER.join(config.getTaskPath(), worker.getHost(), taskId)); + String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), taskId); + if (cf.checkExists().forPath(taskPath) != null) { + cf.delete().guaranteed().forPath(taskPath); + } retryTask(taskRunnerWorkItem, worker.getHost()); } else { log.warn("RemoteTaskRunner has no knowledge of task %s", taskId);