mirror of https://github.com/apache/druid.git
Fix races in LookupSnapshotTaker, CoordinatorPollingBasicAuthenticatorCacheManager (#5344)
* Fix races in LookupSnapshotTaker, CoordinatorPollingBasicAuthenticatorCacheManager. Both were susceptible to the following conditions: 1. Two JVMs on the same machine (perhaps two peons) could conflict by one reading while the other was writing, or by writing to the file at the same time. 2. One JVM could partially write a file, then crash, leaving a truncated file. * Use StringUtils.format
This commit is contained in:
parent
37c09ce3f8
commit
8c738c7076
|
@ -21,7 +21,6 @@ package io.druid.security.basic.authentication.db.cache;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import io.druid.client.coordinator.Coordinator;
|
||||
|
@ -29,6 +28,7 @@ import io.druid.concurrent.LifecycleLock;
|
|||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.java.util.common.FileUtils;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.RetryUtils;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
|
@ -236,7 +236,7 @@ public class CoordinatorPollingBasicAuthenticatorCacheManager implements BasicAu
|
|||
File cacheDir = new File(commonCacheConfig.getCacheDirectory());
|
||||
cacheDir.mkdirs();
|
||||
File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix));
|
||||
Files.write(userMapBytes, userMapFile);
|
||||
FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes));
|
||||
}
|
||||
|
||||
private Map<String, BasicAuthenticatorUser> tryFetchUserMapFromCoordinator(String prefix) throws Exception
|
||||
|
|
|
@ -27,11 +27,16 @@ import com.google.common.io.Files;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FilterOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
public class FileUtils
|
||||
{
|
||||
|
@ -46,6 +51,7 @@ public class FileUtils
|
|||
return input instanceof Exception;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Copy input byte source to outFile. If outFile exists, it is attempted to be deleted.
|
||||
*
|
||||
|
@ -150,10 +156,11 @@ public class FileUtils
|
|||
* }}</pre>
|
||||
*
|
||||
* @param file the file to map
|
||||
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
|
||||
* @throws FileNotFoundException if the {@code file} does not exist
|
||||
* @throws IOException if an I/O error occurs
|
||||
*
|
||||
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
|
||||
*
|
||||
* @throws FileNotFoundException if the {@code file} does not exist
|
||||
* @throws IOException if an I/O error occurs
|
||||
* @see FileChannel#map(FileChannel.MapMode, long, long)
|
||||
*/
|
||||
public static MappedByteBufferHandler map(File file) throws IOException
|
||||
|
@ -161,4 +168,64 @@ public class FileUtils
|
|||
MappedByteBuffer mappedByteBuffer = Files.map(file);
|
||||
return new MappedByteBufferHandler(mappedByteBuffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to
|
||||
* the target location. This function attempts to clean up its temporary files when possible, but they may stick
|
||||
* around (for example, if the JVM crashes partway through executing the function). In any case, the target file
|
||||
* should be unharmed.
|
||||
*
|
||||
* The OutputStream passed to the consumer is uncloseable; calling close on it will do nothing. This is to ensure
|
||||
* that the stream stays open so we can fsync it here before closing. Hopefully, this doesn't cause any problems
|
||||
* for callers.
|
||||
*
|
||||
* This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
|
||||
*/
|
||||
public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException
|
||||
{
|
||||
writeAtomically(file, file.getParentFile(), f);
|
||||
}
|
||||
|
||||
private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException
|
||||
{
|
||||
final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));
|
||||
|
||||
try {
|
||||
try (final FileOutputStream out = new FileOutputStream(tmpFile)) {
|
||||
// Pass f an uncloseable stream so we can fsync before closing.
|
||||
f.accept(uncloseable(out));
|
||||
|
||||
// fsync to avoid write-then-rename-then-crash causing empty files on some filesystems.
|
||||
out.getChannel().force(true);
|
||||
}
|
||||
|
||||
// No exception thrown; do the move.
|
||||
java.nio.file.Files.move(
|
||||
tmpFile.toPath(),
|
||||
file.toPath(),
|
||||
StandardCopyOption.ATOMIC_MOVE,
|
||||
StandardCopyOption.REPLACE_EXISTING
|
||||
);
|
||||
}
|
||||
finally {
|
||||
tmpFile.delete();
|
||||
}
|
||||
}
|
||||
|
||||
private static OutputStream uncloseable(final OutputStream out) throws IOException
|
||||
{
|
||||
return new FilterOutputStream(out)
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public interface OutputStreamConsumer
|
||||
{
|
||||
void accept(OutputStream outputStream) throws IOException;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.junit.rules.TemporaryFolder;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.file.Files;
|
||||
|
||||
public class FileUtilsTest
|
||||
{
|
||||
|
@ -48,4 +49,29 @@ public class FileUtilsTest
|
|||
long buffersMemoryAfter = BufferUtils.totalMemoryUsedByDirectAndMappedBuffers();
|
||||
Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteAtomically() throws IOException
|
||||
{
|
||||
final File tmpDir = folder.newFolder();
|
||||
final File tmpFile = new File(tmpDir, "file1");
|
||||
FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo")));
|
||||
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
|
||||
|
||||
// Try writing again, throw error partway through.
|
||||
try {
|
||||
FileUtils.writeAtomically(tmpFile, out -> {
|
||||
out.write(StringUtils.toUtf8("bar"));
|
||||
out.flush();
|
||||
throw new ISE("OMG!");
|
||||
});
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
// Suppress
|
||||
}
|
||||
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
|
||||
|
||||
FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz")));
|
||||
Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.java.util.common.FileUtils;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
||||
|
@ -50,7 +51,10 @@ public class LookupSnapshotTaker
|
|||
)
|
||||
{
|
||||
this.objectMapper = jsonMapper;
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(persistDirectory), "can not work without specifying persistDirectory");
|
||||
Preconditions.checkArgument(
|
||||
!Strings.isNullOrEmpty(persistDirectory),
|
||||
"can not work without specifying persistDirectory"
|
||||
);
|
||||
this.persistDirectory = new File(persistDirectory);
|
||||
if (!this.persistDirectory.exists()) {
|
||||
Preconditions.checkArgument(this.persistDirectory.mkdirs(), "Oups was not able to create persist directory");
|
||||
|
@ -72,7 +76,7 @@ public class LookupSnapshotTaker
|
|||
LOGGER.warn("found empty file no lookups to load from [%s]", persistFile.getAbsolutePath());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
lookupBeanList = objectMapper.readValue(persistFile, new TypeReference<List<LookupBean>>(){});
|
||||
lookupBeanList = objectMapper.readValue(persistFile, new TypeReference<List<LookupBean>>() {});
|
||||
return lookupBeanList;
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
@ -83,7 +87,7 @@ public class LookupSnapshotTaker
|
|||
public synchronized void takeSnapshot(List<LookupBean> lookups)
|
||||
{
|
||||
try {
|
||||
objectMapper.writeValue(persistFile, lookups);
|
||||
FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups));
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath());
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -43,6 +44,10 @@ public class LookupSnapshotTakerTest
|
|||
{
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||
|
||||
|
||||
|
@ -112,7 +117,7 @@ public class LookupSnapshotTakerTest
|
|||
Assert.assertEquals(lookupBeanList, actualList);
|
||||
}
|
||||
|
||||
@Test(expected = ISE.class)
|
||||
@Test
|
||||
public void testIOExceptionDuringLookupPersist() throws IOException
|
||||
{
|
||||
File directory = temporaryFolder.newFolder();
|
||||
|
@ -120,6 +125,7 @@ public class LookupSnapshotTakerTest
|
|||
Assert.assertFalse(snapshotFile.exists());
|
||||
Assert.assertTrue(snapshotFile.createNewFile());
|
||||
Assert.assertTrue(snapshotFile.setReadOnly());
|
||||
Assert.assertTrue(snapshotFile.getParentFile().setReadOnly());
|
||||
LookupSnapshotTaker lookupSnapshotTaker = new LookupSnapshotTaker(mapper, directory.getAbsolutePath());
|
||||
LookupBean lookupBean = new LookupBean(
|
||||
"name",
|
||||
|
@ -135,10 +141,12 @@ public class LookupSnapshotTakerTest
|
|||
)
|
||||
);
|
||||
List<LookupBean> lookupBeanList = Lists.newArrayList(lookupBean);
|
||||
|
||||
expectedException.expect(ISE.class);
|
||||
expectedException.expectMessage("Exception during serialization of lookups");
|
||||
lookupSnapshotTaker.takeSnapshot(lookupBeanList);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void tesLookupPullingFromEmptyFile() throws IOException
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue