WorkerTaskManager to create disk files atomically and ignore task file corruption (#7917)

* WorkerTaskManager to create disk files atomically and ignore task file
corruptions

* fixing weird checkstyle lambda indentation issues
This commit is contained in:
Himanshu 2019-06-18 09:18:43 -07:00 committed by Fangjin Yang
parent 20d1db9dff
commit 417fcef385
2 changed files with 50 additions and 12 deletions

View File

@ -176,6 +176,15 @@ public class FileUtils
/** /**
* Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to * Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to
* the target location. More docs at {@link FileUtils#writeAtomically(File, File, OutputStreamConsumer)} .
*/
public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
{
return writeAtomically(file, file.getParentFile(), f);
}
/**
* Write to a file atomically, by first writing to a temporary file in given tmpDir directory and then moving it to
* the target location. This function attempts to clean up its temporary files when possible, but they may stick * the target location. This function attempts to clean up its temporary files when possible, but they may stick
* around (for example, if the JVM crashes partway through executing the function). In any case, the target file * around (for example, if the JVM crashes partway through executing the function). In any case, the target file
* should be unharmed. * should be unharmed.
@ -186,12 +195,7 @@ public class FileUtils
* *
* This method is not just thread-safe, but is also safe to use from multiple processes on the same machine. * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
*/ */
public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException public static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
{
return writeAtomically(file, file.getParentFile(), f);
}
private static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
{ {
final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));

View File

@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
@ -129,6 +130,7 @@ public abstract class WorkerTaskManager
synchronized (lock) { synchronized (lock) {
try { try {
log.info("Starting..."); log.info("Starting...");
cleanupAndMakeTmpTaskDir();
registerLocationListener(); registerLocationListener();
restoreRestorableTasks(); restoreRestorableTasks();
initAssignedTasks(); initAssignedTasks();
@ -264,7 +266,12 @@ public abstract class WorkerTaskManager
} }
try { try {
jsonMapper.writeValue(new File(getAssignedTaskDir(), task.getId()), task); FileUtils.writeAtomically(new File(getAssignedTaskDir(), task.getId()), getTmpTaskDir(),
os -> {
jsonMapper.writeValue(os, task);
return null;
}
);
assignedTasks.put(task.getId(), task); assignedTasks.put(task.getId(), task);
} }
catch (IOException ex) { catch (IOException ex) {
@ -286,6 +293,28 @@ public abstract class WorkerTaskManager
submitNoticeToExec(new RunNotice(task)); submitNoticeToExec(new RunNotice(task));
} }
private File getTmpTaskDir()
{
return new File(taskConfig.getBaseTaskDir(), "workerTaskManagerTmp");
}
private void cleanupAndMakeTmpTaskDir()
{
File tmpDir = getTmpTaskDir();
tmpDir.mkdirs();
if (!tmpDir.isDirectory()) {
throw new ISE("Tmp Tasks Dir [%s] does not exist/not-a-directory.", tmpDir);
}
// Delete any tmp files left out from before due to jvm crash.
try {
org.apache.commons.io.FileUtils.cleanDirectory(tmpDir);
}
catch (IOException ex) {
log.warn("Failed to cleanup tmp dir [%s].", tmpDir.getAbsolutePath());
}
}
public File getAssignedTaskDir() public File getAssignedTaskDir()
{ {
return new File(taskConfig.getBaseTaskDir(), "assignedTasks"); return new File(taskConfig.getBaseTaskDir(), "assignedTasks");
@ -311,11 +340,11 @@ public abstract class WorkerTaskManager
assignedTasks.put(taskId, task); assignedTasks.put(taskId, task);
log.info("Found assigned task[%s].", taskId); log.info("Found assigned task[%s].", taskId);
} else { } else {
throw new ISE("Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile()); throw new ISE("WTF! Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile());
} }
} }
catch (IOException ex) { catch (IOException ex) {
throw new ISE(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile()); log.error(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
} }
} }
@ -395,7 +424,12 @@ public abstract class WorkerTaskManager
completedTasks.put(taskId, taskAnnouncement); completedTasks.put(taskId, taskAnnouncement);
try { try {
jsonMapper.writeValue(new File(getCompletedTaskDir(), taskId), taskAnnouncement); FileUtils.writeAtomically(new File(getCompletedTaskDir(), taskId), getTmpTaskDir(),
os -> {
jsonMapper.writeValue(os, taskAnnouncement);
return null;
}
);
} }
catch (IOException ex) { catch (IOException ex) {
log.error(ex, "Error while trying to persist completed task[%s] announcement.", taskId); log.error(ex, "Error while trying to persist completed task[%s] announcement.", taskId);
@ -423,11 +457,11 @@ public abstract class WorkerTaskManager
completedTasks.put(taskId, taskAnnouncement); completedTasks.put(taskId, taskAnnouncement);
log.info("Found completed task[%s] with status[%s].", taskId, taskAnnouncement.getStatus()); log.info("Found completed task[%s] with status[%s].", taskId, taskAnnouncement.getStatus());
} else { } else {
throw new ISE("Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile()); throw new ISE("WTF! Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile());
} }
} }
catch (IOException ex) { catch (IOException ex) {
throw new ISE(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile()); log.error(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
} }
} }
} }