diff --git a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java index 79e010aa40c..ce94a4a8c96 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java @@ -41,7 +41,10 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.StandardOpenOption; import java.util.Enumeration; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -78,16 +81,19 @@ public class CompressionUtils log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory); } - try (final FileOutputStream out = new FileOutputStream(outputZipFile)) { - long bytes = zip(directory, out); - - // For explanation of why fsyncing here is a good practice: - // https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984 - if (fsync) { - out.getChannel().force(true); + if (fsync) { + return FileUtils.writeAtomically(outputZipFile, out -> zip(directory, out)); + } else { + try ( + final FileChannel fileChannel = FileChannel.open( + outputZipFile.toPath(), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE + ); + final OutputStream out = Channels.newOutputStream(fileChannel) + ) { + return zip(directory, out); } - - return bytes; } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java index 97ac6cb51ad..1ba63c10599 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java @@ -24,16 +24,19 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteSource; import com.google.common.io.Files; +import org.apache.druid.java.util.common.logger.Logger; +import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.MappedByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -41,6 +44,8 @@ import java.util.UUID; public class FileUtils { + private static final Logger log = new Logger(FileUtils.class); + /** * Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions */ @@ -182,22 +187,35 @@ public class FileUtils * * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine. */ - public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException + public static T writeAtomically(final File file, OutputStreamConsumer f) throws IOException { - writeAtomically(file, file.getParentFile(), f); + return writeAtomically(file, file.getParentFile(), f); } - private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException + private static T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException { final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); - try { - try (final FileOutputStream out = new FileOutputStream(tmpFile)) { + //noinspection unused + try (final Closeable deleter = () -> java.nio.file.Files.deleteIfExists(tmpFile.toPath())) { + final T retVal; + + try ( + final FileChannel fileChannel = FileChannel.open( + tmpFile.toPath(), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW + ); + final OutputStream out = Channels.newOutputStream(fileChannel) + ) { // Pass f an uncloseable stream so we can fsync before closing. - f.accept(uncloseable(out)); + retVal = f.apply(uncloseable(out)); // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems. - out.getChannel().force(true); + // Must do this before "out" or "fileChannel" is closed. No need to flush "out" first, since + // Channels.newOutputStream is unbuffered. + // See also https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984 + fileChannel.force(true); } // No exception thrown; do the move. @@ -207,9 +225,13 @@ public class FileUtils StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING ); - } - finally { - tmpFile.delete(); + + // fsync the directory entry to ensure the new file will be visible after a crash. + try (final FileChannel directory = FileChannel.open(file.getParentFile().toPath(), StandardOpenOption.READ)) { + directory.force(true); + } + + return retVal; } } @@ -225,8 +247,8 @@ public class FileUtils }; } - public interface OutputStreamConsumer + public interface OutputStreamConsumer { - void accept(OutputStream outputStream) throws IOException; + T apply(OutputStream outputStream) throws IOException; } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java index ca8432cec31..746453f59d1 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java @@ -55,7 +55,10 @@ public class FileUtilsTest { final File tmpDir = folder.newFolder(); final File tmpFile = new File(tmpDir, "file1"); - FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo"))); + FileUtils.writeAtomically(tmpFile, out -> { + out.write(StringUtils.toUtf8("foo")); + return null; + }); Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); // Try writing again, throw error partway through. @@ -71,7 +74,10 @@ public class FileUtilsTest } Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); - FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz"))); + FileUtils.writeAtomically(tmpFile, out -> { + out.write(StringUtils.toUtf8("baz")); + return null; + }); Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); } } diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java index 035568feb54..2641280ed9d 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java @@ -236,7 +236,13 @@ public class CoordinatorPollingBasicAuthenticatorCacheManager implements BasicAu File cacheDir = new File(commonCacheConfig.getCacheDirectory()); cacheDir.mkdirs(); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix)); - FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); + FileUtils.writeAtomically( + userMapFile, + out -> { + out.write(userMapBytes); + return null; + } + ); } private Map tryFetchUserMapFromCoordinator(String prefix) throws Exception diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java index c3115c3a24b..29c3f572bc8 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java @@ -212,7 +212,13 @@ public class CoordinatorPollingBasicAuthorizerCacheManager implements BasicAutho File cacheDir = new File(commonCacheConfig.getCacheDirectory()); cacheDir.mkdirs(); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix)); - FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); + FileUtils.writeAtomically( + userMapFile, + out -> { + out.write(userMapBytes); + return null; + } + ); } @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java index 6d60aafdedc..b2f21329e41 100644 --- a/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java +++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java @@ -88,7 +88,13 @@ public class LookupSnapshotTaker final File persistFile = getPersistFile(tier); try { - FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups)); + FileUtils.writeAtomically( + persistFile, + out -> { + objectMapper.writeValue(out, lookups); + return null; + } + ); } catch (IOException e) { throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath());