Prevent fsync from creating 0-byte files
This is related to LUCENE-5570 where fsync creates a 0-byte file if the file does not exists. This commit adds the patched lucene version using Java 7 APIs as well as a note to replace this method with the upcomeing IOUtils#fsync in Lucene 4.8 This commit cleans up FsImmutableBlobContainer#writeBlob to make use of Java7 Auto-Closing features and ensures that the directory the blob was written to is fsynced as well if possible.
This commit is contained in:
parent
11bf13c363
commit
a215dd3ae8
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
|
||||
|
@ -41,55 +40,38 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
|
||||
public void writeBlob(final String blobName, final InputStream stream, final long sizeInBytes, final WriterListener listener) {
|
||||
blobStore.executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
File file = new File(path, blobName);
|
||||
RandomAccessFile raf;
|
||||
try {
|
||||
raf = new RandomAccessFile(file, "rw");
|
||||
// clean the file if it exists
|
||||
raf.setLength(0);
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
final File file = new File(path, blobName);
|
||||
boolean success = false;
|
||||
try {
|
||||
boolean innerSuccess = false;
|
||||
try {
|
||||
try (final RandomAccessFile raf = new RandomAccessFile(file, "rw");
|
||||
final InputStream is = stream) {
|
||||
// clean the file if it exists
|
||||
raf.setLength(0);
|
||||
long bytesWritten = 0;
|
||||
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
|
||||
final byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
|
||||
int bytesRead;
|
||||
while ((bytesRead = is.read(buffer)) != -1) {
|
||||
raf.write(buffer, 0, bytesRead);
|
||||
bytesWritten += bytesRead;
|
||||
}
|
||||
if (bytesWritten != sizeInBytes) {
|
||||
listener.onFailure(new ElasticsearchIllegalStateException("[" + blobName + "]: wrote [" + bytesWritten + "], expected to write [" + sizeInBytes + "]"));
|
||||
return;
|
||||
}
|
||||
innerSuccess = true;
|
||||
} finally {
|
||||
if (innerSuccess) {
|
||||
IOUtils.close(is, raf);
|
||||
} else {
|
||||
IOUtils.closeWhileHandlingException(is, raf);
|
||||
throw new ElasticsearchIllegalStateException("[" + blobName + "]: wrote [" + bytesWritten + "], expected to write [" + sizeInBytes + "]");
|
||||
}
|
||||
// fsync the FD we are done with writing
|
||||
raf.getFD().sync();
|
||||
// try to fsync the directory to make sure all metadata is written to
|
||||
// the storage device - NOTE: if it's a dir it will not throw any exception
|
||||
FileSystemUtils.syncFile(path, true);
|
||||
}
|
||||
FileSystemUtils.syncFile(file);
|
||||
success = true;
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
// just on the safe size, try and delete it on failure
|
||||
try {
|
||||
if (file.exists()) {
|
||||
file.delete();
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
// ignore
|
||||
}
|
||||
FileSystemUtils.tryDeleteFile(file);
|
||||
} finally {
|
||||
if (success) {
|
||||
listener.onCompleted();
|
||||
|
@ -103,4 +85,4 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
|
|||
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
|
||||
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,12 +19,17 @@
|
|||
|
||||
package org.elasticsearch.common.io;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -172,33 +177,56 @@ public class FileSystemUtils {
|
|||
return false;
|
||||
}
|
||||
|
||||
public static void syncFile(File fileToSync) throws IOException {
|
||||
boolean success = false;
|
||||
int retryCount = 0;
|
||||
static {
|
||||
assert Version.CURRENT.luceneVersion == org.apache.lucene.util.Version.LUCENE_47 : "Use IOUtils#fsync instead of syncFile in Lucene 4.8";
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that any writes to the given file is written to the storage device that contains it.
|
||||
* @param fileToSync the file to fsync
|
||||
* @param isDir if true, the given file is a directory (we open for read and ignore IOExceptions,
|
||||
* because not all file systems and operating systems allow to fsync on a directory)
|
||||
*/
|
||||
public static void syncFile(File fileToSync, boolean isDir) throws IOException {
|
||||
IOException exc = null;
|
||||
while (!success && retryCount < 5) {
|
||||
retryCount++;
|
||||
RandomAccessFile file = null;
|
||||
try {
|
||||
|
||||
// If the file is a directory we have to open read-only, for regular files we must open r/w for the fsync to have an effect.
|
||||
// See http://blog.httrack.com/blog/2013/11/15/everything-you-always-wanted-to-know-about-fsync/
|
||||
try (final FileChannel file = FileChannel.open(fileToSync.toPath(), isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)) {
|
||||
for (int retry = 0; retry < 5; retry++) {
|
||||
try {
|
||||
file = new RandomAccessFile(fileToSync, "rw");
|
||||
file.getFD().sync();
|
||||
success = true;
|
||||
} finally {
|
||||
if (file != null)
|
||||
file.close();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (exc == null)
|
||||
exc = ioe;
|
||||
try {
|
||||
// Pause 5 msec
|
||||
Thread.sleep(5);
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException(ie.getMessage());
|
||||
file.force(true);
|
||||
return;
|
||||
} catch (IOException ioe) {
|
||||
if (exc == null) {
|
||||
exc = ioe;
|
||||
}
|
||||
try {
|
||||
// Pause 5 msec
|
||||
Thread.sleep(5L);
|
||||
} catch (InterruptedException ie) {
|
||||
ThreadInterruptedException ex = new ThreadInterruptedException(ie);
|
||||
ex.addSuppressed(exc);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
if (exc == null) {
|
||||
exc = ioe;
|
||||
}
|
||||
}
|
||||
|
||||
if (isDir) {
|
||||
assert (Constants.LINUX || Constants.MAC_OS_X) == false :
|
||||
"On Linux and MacOSX fsyncing a directory should not throw IOException, "+
|
||||
"we just don't want to rely on that in production (undocumented). Got: " + exc;
|
||||
// Ignore exception if it is a directory
|
||||
return;
|
||||
}
|
||||
|
||||
// Throw original exception
|
||||
throw exc;
|
||||
}
|
||||
|
||||
public static void copyFile(File sourceFile, File destinationFile) throws IOException {
|
||||
|
@ -250,7 +278,13 @@ public class FileSystemUtils {
|
|||
return true;
|
||||
}
|
||||
|
||||
private FileSystemUtils() {
|
||||
private FileSystemUtils() {}
|
||||
|
||||
public static void tryDeleteFile(File file) {
|
||||
try {
|
||||
file.delete();
|
||||
} catch (SecurityException e1) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue