Skip OS cache on Linux when pulling segments (#5421)

Druid relies on the page cache of Linux in order to have memory segments.
However when loading segments from deep storage or rebalancing the page
cache can get poisoned by segments that should not be in memory yet.
This can significantly slow down Druid in case rebalancing happens
as data that might not be queried often is suddenly in the page cache.

This PR implements the same logic as is in Apache Cassandra and Apache
Bookkeeper.

Closes #4746
This commit is contained in:
bolkedebruin 2018-03-08 16:54:21 +01:00 committed by Charles Allen
parent 8fae0edc95
commit 8f07a39af7
6 changed files with 281 additions and 6 deletions

View File

@ -29,12 +29,14 @@ import io.druid.java.util.common.IAE;
import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.UOE; import io.druid.java.util.common.UOE;
import io.druid.java.util.common.io.NativeIO;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller; import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -197,9 +199,9 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller
log.warn("[%s] is a child directory, skipping", childPath.toString()); log.warn("[%s] is a child directory, skipping", childPath.toString());
} else { } else {
final File outFile = new File(outDir, fname); final File outFile = new File(outDir, fname);
try (final FSDataInputStream in = fs.open(childPath)) {
// Actual copy NativeIO.chunkedCopy(in, outFile);
fs.copyToLocalFile(childPath, new Path(outFile.toURI())); }
result.addFile(outFile); result.addFile(outFile);
} }
} }

View File

@ -101,6 +101,10 @@
<groupId>com.google.code.findbugs</groupId> <groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
</dependency> </dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>
<dependency> <dependency>
<groupId>javax.validation</groupId> <groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId> <artifactId>validation-api</artifactId>

View File

@ -26,6 +26,7 @@ import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
import io.druid.java.util.common.io.NativeIO;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -224,6 +225,7 @@ public class CompressionUtils
final Enumeration<? extends ZipEntry> enumeration = zipFile.entries(); final Enumeration<? extends ZipEntry> enumeration = zipFile.entries();
while (enumeration.hasMoreElements()) { while (enumeration.hasMoreElements()) {
final ZipEntry entry = enumeration.nextElement(); final ZipEntry entry = enumeration.nextElement();
final File outFile = new File(outDir, entry.getName());
result.addFiles( result.addFiles(
FileUtils.retryCopy( FileUtils.retryCopy(
new ByteSource() new ByteSource()
@ -234,7 +236,7 @@ public class CompressionUtils
return new BufferedInputStream(zipFile.getInputStream(entry)); return new BufferedInputStream(zipFile.getInputStream(entry));
} }
}, },
new File(outDir, entry.getName()), outFile,
FileUtils.IS_EXCEPTION, FileUtils.IS_EXCEPTION,
DEFAULT_RETRY_COUNT DEFAULT_RETRY_COUNT
).getFiles() ).getFiles()
@ -263,7 +265,9 @@ public class CompressionUtils
ZipEntry entry; ZipEntry entry;
while ((entry = zipIn.getNextEntry()) != null) { while ((entry = zipIn.getNextEntry()) != null) {
final File file = new File(outDir, entry.getName()); final File file = new File(outDir, entry.getName());
Files.asByteSink(file).writeFrom(zipIn);
NativeIO.chunkedCopy(zipIn, file);
result.addFile(file); result.addFile(file);
zipIn.closeEntry(); zipIn.closeEntry();
} }
@ -297,7 +301,7 @@ public class CompressionUtils
public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) throws IOException public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) throws IOException
{ {
try (GZIPInputStream gzipInputStream = gzipInputStream(in)) { try (GZIPInputStream gzipInputStream = gzipInputStream(in)) {
Files.asByteSink(outFile).writeFrom(gzipInputStream); NativeIO.chunkedCopy(gzipInputStream, outFile);
return new FileUtils.FileCopyResult(outFile); return new FileUtils.FileCopyResult(outFile);
} }
} }

View File

@ -0,0 +1,184 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common.io;
import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jna.LastErrorException;
import com.sun.jna.Native;
import com.sun.jna.Platform;
import io.druid.java.util.common.logger.Logger;
/**
* Native I/O operations in order to minimize cache impact.
*/
public class NativeIO
{
private static final Logger log = new Logger(NativeIO.class);
private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
private static Field field;
private static volatile boolean initialized = false;
private static volatile boolean fadvisePossible = true;
static {
field = getFieldByReflection(FileDescriptor.class, "fd");
try {
Native.register(Platform.C_LIBRARY_NAME);
initialized = true;
}
catch (NoClassDefFoundError e) {
log.info("JNA not found. Native methods will be disabled.");
}
catch (UnsatisfiedLinkError e) {
log.info("Unable to link C library. Native methods will be disabled.");
}
catch (NoSuchMethodError e) {
log.warn("Obsolete version of JNA present; unable to register C library");
}
}
private static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException;
private NativeIO() {}
private static Field getFieldByReflection(Class cls, String fieldName)
{
Field field = null;
try {
field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
}
catch (Exception e) {
log.warn("Unable to read [%s] field from [%s]", fieldName, cls.getName());
}
return field;
}
/**
* Get system file descriptor (int) from FileDescriptor object.
* @param descriptor - FileDescriptor object to get fd from
* @return file descriptor, -1 or error
*/
public static int getfd(FileDescriptor descriptor)
{
try {
return field.getInt(descriptor);
}
catch (IllegalArgumentException | IllegalAccessException e) {
log.warn("Unable to read fd field from java.io.FileDescriptor");
}
return -1;
}
/**
* Remove pages from the file system page cache when they wont
* be accessed again
*
* @param fd The file descriptor of the source file.
* @param offset The offset within the file.
* @param len The length to be flushed.
*/
public static void trySkipCache(int fd, long offset, long len)
{
if (!initialized || !fadvisePossible || fd < 0) {
return;
}
try {
// we ignore the return value as this is just best effort to avoid the cache
posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
}
catch (UnsupportedOperationException uoe) {
log.warn(uoe, "posix_fadvise is not supported");
fadvisePossible = false;
}
catch (UnsatisfiedLinkError ule) {
// if JNA is unavailable just skipping Direct I/O
// instance of this class will act like normal RandomAccessFile
log.warn(ule, "Unsatisfied Link error: posix_fadvise failed on file descriptor [%d], offset [%d]",
fd, offset);
fadvisePossible = false;
}
catch (Exception e) {
// This is best effort anyway so lets just log that there was an
// exception and forget
log.warn(e, "Unknown exception: posix_fadvise failed on file descriptor [%d], offset [%d]",
fd, offset);
}
}
/**
* Copy from an input stream to a file minimizing cache impact on the destination.. This happens chunk by chunk
* so only at most chunk size will be present in the OS page cache. Posix (Linux, BSD) only.
*
* @param src Source InputStream where to copy from
* @param dest Destination file to copy to
* @throws IOException
*/
public static void chunkedCopy(InputStream src, File dest) throws IOException
{
final byte[] buf = new byte[8 << 20]; // 8Mb buffer
long offset = 0;
try (
final RandomAccessFile raf = new RandomAccessFile(dest, "rwd")
) {
final int fd = getfd(raf.getFD());
for (int numBytes = 0, bytesRead = 0; bytesRead > -1; ) {
bytesRead = src.read(buf, numBytes, buf.length - numBytes);
if (numBytes >= buf.length || bytesRead == -1) {
raf.write(buf, 0, numBytes);
trySkipCache(fd, offset, numBytes);
offset = raf.getFilePointer();
numBytes = 0;
}
numBytes += bytesRead;
}
}
}
@VisibleForTesting
static void setFadvisePossible(boolean setting)
{
fadvisePossible = setting;
}
@VisibleForTesting
static boolean getFadvisePossible()
{
return fadvisePossible;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common.io;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
public class NativeIOTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testChunkedCopy() throws Exception
{
File f = tempFolder.newFile();
byte[] bytes = new byte[]{(byte) 0x8, (byte) 0x9};
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
NativeIO.chunkedCopy(bis, f);
byte[] data = Files.readAllBytes(f.toPath());
Assert.assertTrue(Arrays.equals(bytes, data));
}
@Test(expected = IOException.class)
public void testException() throws Exception
{
File dir = tempFolder.newFolder();
NativeIO.chunkedCopy(null, dir);
}
@Test
public void testDisabledFadviseChunkedCopy() throws Exception
{
boolean possible = NativeIO.getFadvisePossible();
NativeIO.setFadvisePossible(false);
File f = tempFolder.newFile();
byte[] bytes = new byte[]{(byte) 0x8, (byte) 0x9};
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
NativeIO.chunkedCopy(bis, f);
byte[] data = Files.readAllBytes(f.toPath());
NativeIO.setFadvisePossible(possible);
Assert.assertTrue(Arrays.equals(bytes, data));
}
}

View File

@ -733,6 +733,11 @@
<artifactId>jvm-attach-api</artifactId> <artifactId>jvm-attach-api</artifactId>
<version>1.2</version> <version>1.2</version>
</dependency> </dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.5.1</version>
</dependency>
<!-- Test Scope --> <!-- Test Scope -->
<dependency> <dependency>