diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 9c21ce43d6f..47f53097022 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -498,16 +498,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); } - cf.create() - .withMode(CreateMode.EPHEMERAL) - .forPath( - JOINER.join( - config.getIndexerTaskPath(), - theWorker.getHost(), - task.getId() - ), - rawBytes - ); + String taskPath = JOINER.join(config.getIndexerTaskPath(), theWorker.getHost(), task.getId()); + + if (cf.checkExists().forPath(taskPath) == null) { + cf.create() + .withMode(CreateMode.EPHEMERAL) + .forPath( + taskPath, rawBytes + ); + } runningTasks.put(task.getId(), pendingTasks.remove(task.getId())); log.info("Task %s switched from pending to running", task.getId());