mirror of https://github.com/apache/druid.git
Optimize chunkedCopy for sequential writes (#5477)
NativeIO.chunkedCopy fsyncs its writebuffer directly and requires an O_DIRECT RandomAccessFile. By allowing the kernel to start writing while filling the buffer the writes will be more constant. In addition the O_DIRECT flag is not required anymore and this will work faster in case fadvise is not supported on some system. This is based on Linus' post here: http://lkml.iu.edu/hypermail/linux/kernel/1005.2/01845.html
This commit is contained in:
parent
e096a8d6c5
commit
7d1163b0d9
|
@ -40,10 +40,28 @@ public class NativeIO
|
||||||
private static final Logger log = new Logger(NativeIO.class);
|
private static final Logger log = new Logger(NativeIO.class);
|
||||||
|
|
||||||
private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
|
private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait upon writeout of all pages in the range before performing the write.
|
||||||
|
*/
|
||||||
|
private static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initiate writeout of all those dirty pages in the range which are not presently
|
||||||
|
* under writeback.
|
||||||
|
*/
|
||||||
|
private static final int SYNC_FILE_RANGE_WRITE = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait upon writeout of all pages in the range after performing the write.
|
||||||
|
*/
|
||||||
|
private static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
|
||||||
|
|
||||||
private static Field field;
|
private static Field field;
|
||||||
|
|
||||||
private static volatile boolean initialized = false;
|
private static volatile boolean initialized = false;
|
||||||
private static volatile boolean fadvisePossible = true;
|
private static volatile boolean fadvisePossible = true;
|
||||||
|
private static volatile boolean syncFileRangePossible = true;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
field = getFieldByReflection(FileDescriptor.class, "fd");
|
field = getFieldByReflection(FileDescriptor.class, "fd");
|
||||||
|
@ -64,6 +82,7 @@ public class NativeIO
|
||||||
}
|
}
|
||||||
|
|
||||||
private static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException;
|
private static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException;
|
||||||
|
private static native int sync_file_range(int fd, long offset, long len, int flags);
|
||||||
|
|
||||||
private NativeIO() {}
|
private NativeIO() {}
|
||||||
|
|
||||||
|
@ -135,10 +154,47 @@ public class NativeIO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync part of an open file to the file system.
|
||||||
|
*
|
||||||
|
* @param fd The file descriptor of the source file.
|
||||||
|
* @param offset The offset within the file.
|
||||||
|
* @param nbytes The number of bytes to be synced.
|
||||||
|
* @param flags Signal how to synchronize
|
||||||
|
*/
|
||||||
|
private static void trySyncFileRange(int fd, long offset, long nbytes, int flags)
|
||||||
|
{
|
||||||
|
if (!initialized || !syncFileRangePossible || fd < 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
int ret_code = sync_file_range(fd, offset, nbytes, flags);
|
||||||
|
if (ret_code != 0) {
|
||||||
|
log.warn("failed on syncing fd [%d], offset [%d], bytes [%d], ret_code [%d], errno [%d]",
|
||||||
|
fd, offset, nbytes, ret_code, Native.getLastError());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (UnsupportedOperationException uoe) {
|
||||||
|
log.warn(uoe, "sync_file_range is not supported");
|
||||||
|
syncFileRangePossible = false;
|
||||||
|
}
|
||||||
|
catch (UnsatisfiedLinkError nle) {
|
||||||
|
log.warn(nle, "sync_file_range failed on fd [%d], offset [%d], bytes [%d]", fd, offset, nbytes);
|
||||||
|
syncFileRangePossible = false;
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.warn(e, "Unknown exception: sync_file_range failed on fd [%d], offset [%d], bytes [%d]",
|
||||||
|
fd, offset, nbytes);
|
||||||
|
syncFileRangePossible = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copy from an input stream to a file minimizing cache impact on the destination.. This happens chunk by chunk
|
* Copy from an input stream to a file minimizing cache impact on the destination.. This happens chunk by chunk
|
||||||
* so only at most chunk size will be present in the OS page cache. Posix (Linux, BSD) only.
|
* so only at most chunk size will be present in the OS page cache. Posix (Linux, BSD) only. The implementation
|
||||||
|
* in this method is based on a post by Linus Torvalds here:
|
||||||
|
* http://lkml.iu.edu/hypermail/linux/kernel/1005.2/01845.html
|
||||||
*
|
*
|
||||||
* @param src Source InputStream where to copy from
|
* @param src Source InputStream where to copy from
|
||||||
* @param dest Destination file to copy to
|
* @param dest Destination file to copy to
|
||||||
|
@ -149,21 +205,33 @@ public class NativeIO
|
||||||
|
|
||||||
final byte[] buf = new byte[8 << 20]; // 8Mb buffer
|
final byte[] buf = new byte[8 << 20]; // 8Mb buffer
|
||||||
long offset = 0;
|
long offset = 0;
|
||||||
|
long lastOffset = 0;
|
||||||
|
|
||||||
try (
|
try (
|
||||||
final RandomAccessFile raf = new RandomAccessFile(dest, "rwd")
|
final RandomAccessFile raf = new RandomAccessFile(dest, "rw")
|
||||||
) {
|
) {
|
||||||
final int fd = getfd(raf.getFD());
|
final int fd = getfd(raf.getFD());
|
||||||
|
|
||||||
for (int numBytes = 0, bytesRead = 0; bytesRead > -1; ) {
|
for (int numBytes = 0, bytesRead = 0, lastBytes = 0; bytesRead > -1;) {
|
||||||
bytesRead = src.read(buf, numBytes, buf.length - numBytes);
|
bytesRead = src.read(buf, numBytes, buf.length - numBytes);
|
||||||
|
|
||||||
if (numBytes >= buf.length || bytesRead == -1) {
|
if (numBytes >= buf.length || bytesRead == -1) {
|
||||||
raf.write(buf, 0, numBytes);
|
raf.write(buf, 0, numBytes);
|
||||||
trySkipCache(fd, offset, numBytes);
|
|
||||||
|
// This won't block, but will start writeout asynchronously
|
||||||
|
trySyncFileRange(fd, offset, numBytes, SYNC_FILE_RANGE_WRITE);
|
||||||
|
if (offset > 0) {
|
||||||
|
// This does a blocking write-and-wait on any old ranges
|
||||||
|
trySyncFileRange(fd, lastOffset, lastBytes,
|
||||||
|
SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER);
|
||||||
|
// Remove the old range from the cache
|
||||||
|
trySkipCache(fd, lastOffset, lastBytes);
|
||||||
|
}
|
||||||
|
lastOffset = offset;
|
||||||
offset = raf.getFilePointer();
|
offset = raf.getFilePointer();
|
||||||
numBytes = 0;
|
numBytes = 0;
|
||||||
}
|
}
|
||||||
|
lastBytes = numBytes;
|
||||||
|
|
||||||
numBytes += bytesRead;
|
numBytes += bytesRead;
|
||||||
}
|
}
|
||||||
|
@ -177,8 +245,21 @@ public class NativeIO
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static boolean getFadvisePossible()
|
static boolean isFadvisePossible()
|
||||||
{
|
{
|
||||||
return fadvisePossible;
|
return fadvisePossible;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static void setSyncFileRangePossible(boolean setting)
|
||||||
|
{
|
||||||
|
syncFileRangePossible = setting;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static boolean isSyncFileRangePossible()
|
||||||
|
{
|
||||||
|
return syncFileRangePossible;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class NativeIOTest
|
||||||
@Test
|
@Test
|
||||||
public void testDisabledFadviseChunkedCopy() throws Exception
|
public void testDisabledFadviseChunkedCopy() throws Exception
|
||||||
{
|
{
|
||||||
boolean possible = NativeIO.getFadvisePossible();
|
boolean possible = NativeIO.isFadvisePossible();
|
||||||
|
|
||||||
NativeIO.setFadvisePossible(false);
|
NativeIO.setFadvisePossible(false);
|
||||||
File f = tempFolder.newFile();
|
File f = tempFolder.newFile();
|
||||||
|
@ -73,4 +73,22 @@ public class NativeIOTest
|
||||||
Assert.assertTrue(Arrays.equals(bytes, data));
|
Assert.assertTrue(Arrays.equals(bytes, data));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisabledSyncFileRangePossible() throws Exception
|
||||||
|
{
|
||||||
|
boolean possible = NativeIO.isSyncFileRangePossible();
|
||||||
|
|
||||||
|
NativeIO.setSyncFileRangePossible(false);
|
||||||
|
File f = tempFolder.newFile();
|
||||||
|
byte[] bytes = new byte[]{(byte) 0x8, (byte) 0x9};
|
||||||
|
|
||||||
|
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
|
||||||
|
NativeIO.chunkedCopy(bis, f);
|
||||||
|
|
||||||
|
byte[] data = Files.readAllBytes(f.toPath());
|
||||||
|
|
||||||
|
NativeIO.setSyncFileRangePossible(possible);
|
||||||
|
Assert.assertTrue(Arrays.equals(bytes, data));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue