diff --git a/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java b/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java index c7d6cad4d0f..fe5dd7e679a 100644 --- a/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java +++ b/extensions-core/druid-basic-security/src/main/java/io/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java @@ -21,7 +21,6 @@ package io.druid.security.basic.authentication.db.cache; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.io.Files; import com.google.inject.Inject; import com.google.inject.Injector; import io.druid.client.coordinator.Coordinator; @@ -29,6 +28,7 @@ import io.druid.concurrent.LifecycleLock; import io.druid.discovery.DruidLeaderClient; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Smile; +import io.druid.java.util.common.FileUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; @@ -236,7 +236,7 @@ public class CoordinatorPollingBasicAuthenticatorCacheManager implements BasicAu File cacheDir = new File(commonCacheConfig.getCacheDirectory()); cacheDir.mkdirs(); File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix)); - Files.write(userMapBytes, userMapFile); + FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes)); } private Map tryFetchUserMapFromCoordinator(String prefix) throws Exception diff --git a/java-util/src/main/java/io/druid/java/util/common/FileUtils.java b/java-util/src/main/java/io/druid/java/util/common/FileUtils.java index 72be3a57e18..0a5d0806388 100644 --- a/java-util/src/main/java/io/druid/java/util/common/FileUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/FileUtils.java @@ -27,11 +27,16 @@ import com.google.common.io.Files; import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilterOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.Collection; +import java.util.UUID; public class FileUtils { @@ -46,6 +51,7 @@ public class FileUtils return input instanceof Exception; } }; + /** * Copy input byte source to outFile. If outFile exists, it is attempted to be deleted. * @@ -150,10 +156,11 @@ public class FileUtils * }} * * @param file the file to map - * @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file} - * @throws FileNotFoundException if the {@code file} does not exist - * @throws IOException if an I/O error occurs * + * @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file} + * + * @throws FileNotFoundException if the {@code file} does not exist + * @throws IOException if an I/O error occurs * @see FileChannel#map(FileChannel.MapMode, long, long) */ public static MappedByteBufferHandler map(File file) throws IOException @@ -161,4 +168,64 @@ public class FileUtils MappedByteBuffer mappedByteBuffer = Files.map(file); return new MappedByteBufferHandler(mappedByteBuffer); } + + /** + * Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to + * the target location. This function attempts to clean up its temporary files when possible, but they may stick + * around (for example, if the JVM crashes partway through executing the function). In any case, the target file + * should be unharmed. + * + * The OutputStream passed to the consumer is uncloseable; calling close on it will do nothing. This is to ensure + * that the stream stays open so we can fsync it here before closing. Hopefully, this doesn't cause any problems + * for callers. + * + * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine. + */ + public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException + { + writeAtomically(file, file.getParentFile(), f); + } + + private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException + { + final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID())); + + try { + try (final FileOutputStream out = new FileOutputStream(tmpFile)) { + // Pass f an uncloseable stream so we can fsync before closing. + f.accept(uncloseable(out)); + + // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems. + out.getChannel().force(true); + } + + // No exception thrown; do the move. + java.nio.file.Files.move( + tmpFile.toPath(), + file.toPath(), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING + ); + } + finally { + tmpFile.delete(); + } + } + + private static OutputStream uncloseable(final OutputStream out) throws IOException + { + return new FilterOutputStream(out) + { + @Override + public void close() throws IOException + { + // Do nothing. + } + }; + } + + public interface OutputStreamConsumer + { + void accept(OutputStream outputStream) throws IOException; + } } diff --git a/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java b/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java index a9594f7a02a..3e4aea4eca8 100644 --- a/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/FileUtilsTest.java @@ -27,6 +27,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.file.Files; public class FileUtilsTest { @@ -48,4 +49,29 @@ public class FileUtilsTest long buffersMemoryAfter = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers(); Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter); } + + @Test + public void testWriteAtomically() throws IOException + { + final File tmpDir = folder.newFolder(); + final File tmpFile = new File(tmpDir, "file1"); + FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo"))); + Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); + + // Try writing again, throw error partway through. + try { + FileUtils.writeAtomically(tmpFile, out -> { + out.write(StringUtils.toUtf8("bar")); + out.flush(); + throw new ISE("OMG!"); + }); + } + catch (IllegalStateException e) { + // Suppress + } + Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); + + FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz"))); + Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath()))); + } } diff --git a/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java b/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java index 3d67124610f..9162bf4b203 100644 --- a/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java +++ b/processing/src/main/java/io/druid/query/lookup/LookupSnapshotTaker.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import io.druid.guice.annotations.Json; +import io.druid.java.util.common.FileUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; @@ -50,7 +51,10 @@ public class LookupSnapshotTaker ) { this.objectMapper = jsonMapper; - Preconditions.checkArgument(!Strings.isNullOrEmpty(persistDirectory), "can not work without specifying persistDirectory"); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(persistDirectory), + "can not work without specifying persistDirectory" + ); this.persistDirectory = new File(persistDirectory); if (!this.persistDirectory.exists()) { Preconditions.checkArgument(this.persistDirectory.mkdirs(), "Oups was not able to create persist directory"); @@ -72,7 +76,7 @@ public class LookupSnapshotTaker LOGGER.warn("found empty file no lookups to load from [%s]", persistFile.getAbsolutePath()); return Collections.emptyList(); } - lookupBeanList = objectMapper.readValue(persistFile, new TypeReference>(){}); + lookupBeanList = objectMapper.readValue(persistFile, new TypeReference>() {}); return lookupBeanList; } catch (IOException e) { @@ -83,7 +87,7 @@ public class LookupSnapshotTaker public synchronized void takeSnapshot(List lookups) { try { - objectMapper.writeValue(persistFile, lookups); + FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups)); } catch (IOException e) { throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath()); diff --git a/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java b/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java index 3fc085623ff..ac63bbdf01e 100644 --- a/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java +++ b/server/src/test/java/io/druid/query/lookup/LookupSnapshotTakerTest.java @@ -31,6 +31,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import java.io.File; @@ -43,6 +44,10 @@ public class LookupSnapshotTakerTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private final ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -112,7 +117,7 @@ public class LookupSnapshotTakerTest Assert.assertEquals(lookupBeanList, actualList); } - @Test(expected = ISE.class) + @Test public void testIOExceptionDuringLookupPersist() throws IOException { File directory = temporaryFolder.newFolder(); @@ -120,6 +125,7 @@ public class LookupSnapshotTakerTest Assert.assertFalse(snapshotFile.exists()); Assert.assertTrue(snapshotFile.createNewFile()); Assert.assertTrue(snapshotFile.setReadOnly()); + Assert.assertTrue(snapshotFile.getParentFile().setReadOnly()); LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath()); LookupBean lookupBean = new LookupBean( "name", @@ -135,10 +141,12 @@ public class LookupSnapshotTakerTest ) ); List lookupBeanList = Lists.newArrayList(lookupBean); + + expectedException.expect(ISE.class); + expectedException.expectMessage("Exception during serialization of lookups"); lookupSnapshotTaker.takeSnapshot(lookupBeanList); } - @Test public void tesLookupPullingFromEmptyFile() throws IOException {