FileUtils: Sync directory entry too on writeAtomically. (#6677)

* FileUtils: Sync directory entry too on writeAtomically.

See the fsync(2) man page for why this is important:
https://linux.die.net/man/2/fsync

This also plumbs CompressionUtils's "zip" function through
writeAtomically, so the code for handling atomic local filesystem
writes is all done in the same place.

* Remove unused import.

* Avoid FileOutputStream.

* Allow non-atomic writes to overwrite.

* Add some comments. And no need to flush an unbuffered stream.
This commit is contained in:
Gian Merlino 2018-12-08 08:12:59 -08:00 committed by Roman Leventov
parent bbb283fa34
commit b7709e1245
6 changed files with 79 additions and 27 deletions

View File

@ -41,7 +41,10 @@ import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
@ -78,16 +81,19 @@ public class CompressionUtils
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory); log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
} }
try (final FileOutputStream out = new FileOutputStream(outputZipFile)) {
long bytes = zip(directory, out);
// For explanation of why fsyncing here is a good practice:
// https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984
if (fsync) { if (fsync) {
out.getChannel().force(true); return FileUtils.writeAtomically(outputZipFile, out -> zip(directory, out));
} else {
try (
final FileChannel fileChannel = FileChannel.open(
outputZipFile.toPath(),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE
);
final OutputStream out = Channels.newOutputStream(fileChannel)
) {
return zip(directory, out);
} }
return bytes;
} }
} }

View File

@ -24,16 +24,19 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.io.Files; import com.google.common.io.Files;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilterOutputStream; import java.io.FilterOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -41,6 +44,8 @@ import java.util.UUID;
public class FileUtils public class FileUtils
{ {
private static final Logger log = new Logger(FileUtils.class);
/** /**
* Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions * Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions
*/ */
@ -182,22 +187,35 @@ public class FileUtils
* *
* This method is not just thread-safe, but is also safe to use from multiple processes on the same machine. * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
*/ */
public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
{ {
writeAtomically(file, file.getParentFile(), f); return writeAtomically(file, file.getParentFile(), f);
} }
private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException private static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
{ {
final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));
try { //noinspection unused
try (final FileOutputStream out = new FileOutputStream(tmpFile)) { try (final Closeable deleter = () -> java.nio.file.Files.deleteIfExists(tmpFile.toPath())) {
final T retVal;
try (
final FileChannel fileChannel = FileChannel.open(
tmpFile.toPath(),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW
);
final OutputStream out = Channels.newOutputStream(fileChannel)
) {
// Pass f an uncloseable stream so we can fsync before closing. // Pass f an uncloseable stream so we can fsync before closing.
f.accept(uncloseable(out)); retVal = f.apply(uncloseable(out));
// fsync to avoid write-then-rename-then-crash causing empty files on some filesystems. // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems.
out.getChannel().force(true); // Must do this before "out" or "fileChannel" is closed. No need to flush "out" first, since
// Channels.newOutputStream is unbuffered.
// See also https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984
fileChannel.force(true);
} }
// No exception thrown; do the move. // No exception thrown; do the move.
@ -207,9 +225,13 @@ public class FileUtils
StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING StandardCopyOption.REPLACE_EXISTING
); );
// fsync the directory entry to ensure the new file will be visible after a crash.
try (final FileChannel directory = FileChannel.open(file.getParentFile().toPath(), StandardOpenOption.READ)) {
directory.force(true);
} }
finally {
tmpFile.delete(); return retVal;
} }
} }
@ -225,8 +247,8 @@ public class FileUtils
}; };
} }
public interface OutputStreamConsumer public interface OutputStreamConsumer<T>
{ {
void accept(OutputStream outputStream) throws IOException; T apply(OutputStream outputStream) throws IOException;
} }
} }

View File

@ -55,7 +55,10 @@ public class FileUtilsTest
{ {
final File tmpDir = folder.newFolder(); final File tmpDir = folder.newFolder();
final File tmpFile = new File(tmpDir, "file1"); final File tmpFile = new File(tmpDir, "file1");
FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo"))); FileUtils.writeAtomically(tmpFile, out -> {
out.write(StringUtils.toUtf8("foo"));
return null;
});
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
// Try writing again, throw error partway through. // Try writing again, throw error partway through.
@ -71,7 +74,10 @@ public class FileUtilsTest
} }
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz"))); FileUtils.writeAtomically(tmpFile, out -> {
out.write(StringUtils.toUtf8("baz"));
return null;
});
Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
} }
} }

View File

@ -236,7 +236,13 @@ public class CoordinatorPollingBasicAuthenticatorCacheManager implements BasicAu
File cacheDir = new File(commonCacheConfig.getCacheDirectory()); File cacheDir = new File(commonCacheConfig.getCacheDirectory());
cacheDir.mkdirs(); cacheDir.mkdirs();
File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix)); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix));
FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); FileUtils.writeAtomically(
userMapFile,
out -> {
out.write(userMapBytes);
return null;
}
);
} }
private Map<String, BasicAuthenticatorUser> tryFetchUserMapFromCoordinator(String prefix) throws Exception private Map<String, BasicAuthenticatorUser> tryFetchUserMapFromCoordinator(String prefix) throws Exception

View File

@ -212,7 +212,13 @@ public class CoordinatorPollingBasicAuthorizerCacheManager implements BasicAutho
File cacheDir = new File(commonCacheConfig.getCacheDirectory()); File cacheDir = new File(commonCacheConfig.getCacheDirectory());
cacheDir.mkdirs(); cacheDir.mkdirs();
File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix)); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix));
FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); FileUtils.writeAtomically(
userMapFile,
out -> {
out.write(userMapBytes);
return null;
}
);
} }
@Nullable @Nullable

View File

@ -88,7 +88,13 @@ public class LookupSnapshotTaker
final File persistFile = getPersistFile(tier); final File persistFile = getPersistFile(tier);
try { try {
FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups)); FileUtils.writeAtomically(
persistFile,
out -> {
objectMapper.writeValue(out, lookups);
return null;
}
);
} }
catch (IOException e) { catch (IOException e) {
throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath()); throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath());