From 8f07a39af70d51f275e0da69ff3fa7df7a89cd8d Mon Sep 17 00:00:00 2001 From: bolkedebruin Date: Thu, 8 Mar 2018 16:54:21 +0100 Subject: [PATCH] Skip OS cache on Linux when pulling segments (#5421) Druid relies on the page cache of Linux in order to have memory segments. However when loading segments from deep storage or rebalancing the page cache can get poisoned by segments that should not be in memory yet. This can significantly slow down Druid in case rebalancing happens as data that might not be queried often is suddenly in the page cache. This PR implements the same logic as is in Apache Cassandra and Apache Bookkeeper. Closes #4746 --- .../storage/hdfs/HdfsDataSegmentPuller.java | 8 +- java-util/pom.xml | 4 + .../java/util/common/CompressionUtils.java | 10 +- .../druid/java/util/common/io/NativeIO.java | 184 ++++++++++++++++++ .../java/util/common/io/NativeIOTest.java | 76 ++++++++ pom.xml | 5 + 6 files changed, 281 insertions(+), 6 deletions(-) create mode 100644 java-util/src/main/java/io/druid/java/util/common/io/NativeIO.java create mode 100644 java-util/src/test/java/io/druid/java/util/common/io/NativeIOTest.java diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java index 1ec9e16b451..66a7c4cc334 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -29,12 +29,14 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.UOE; +import io.druid.java.util.common.io.NativeIO; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -197,9 +199,9 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller, URIDataPuller log.warn("[%s] is a child directory, skipping", childPath.toString()); } else { final File outFile = new File(outDir, fname); - - // Actual copy - fs.copyToLocalFile(childPath, new Path(outFile.toURI())); + try (final FSDataInputStream in = fs.open(childPath)) { + NativeIO.chunkedCopy(in, outFile); + } result.addFile(outFile); } } diff --git a/java-util/pom.xml b/java-util/pom.xml index 1efd28e039a..150c332a3ca 100644 --- a/java-util/pom.xml +++ b/java-util/pom.xml @@ -101,6 +101,10 @@ com.google.code.findbugs jsr305 + + net.java.dev.jna + jna + javax.validation validation-api diff --git a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java index 40bbef91b82..b8498780c19 100644 --- a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java @@ -26,6 +26,7 @@ import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import io.druid.java.util.common.io.NativeIO; import io.druid.java.util.common.logger.Logger; import java.io.BufferedInputStream; @@ -224,6 +225,7 @@ public class CompressionUtils final Enumeration enumeration = zipFile.entries(); while (enumeration.hasMoreElements()) { final ZipEntry entry = enumeration.nextElement(); + final File outFile = new File(outDir, entry.getName()); result.addFiles( FileUtils.retryCopy( new ByteSource() @@ -234,7 +236,7 @@ public class CompressionUtils return new BufferedInputStream(zipFile.getInputStream(entry)); } }, - new File(outDir, entry.getName()), + outFile, FileUtils.IS_EXCEPTION, DEFAULT_RETRY_COUNT ).getFiles() @@ -263,7 +265,9 @@ public class CompressionUtils ZipEntry entry; while ((entry = zipIn.getNextEntry()) != null) { final File file = new File(outDir, entry.getName()); - Files.asByteSink(file).writeFrom(zipIn); + + NativeIO.chunkedCopy(zipIn, file); + result.addFile(file); zipIn.closeEntry(); } @@ -297,7 +301,7 @@ public class CompressionUtils public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) throws IOException { try (GZIPInputStream gzipInputStream = gzipInputStream(in)) { - Files.asByteSink(outFile).writeFrom(gzipInputStream); + NativeIO.chunkedCopy(gzipInputStream, outFile); return new FileUtils.FileCopyResult(outFile); } } diff --git a/java-util/src/main/java/io/druid/java/util/common/io/NativeIO.java b/java-util/src/main/java/io/druid/java/util/common/io/NativeIO.java new file mode 100644 index 00000000000..6bd54584c0f --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/io/NativeIO.java @@ -0,0 +1,184 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.io; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.lang.reflect.Field; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.jna.LastErrorException; +import com.sun.jna.Native; +import com.sun.jna.Platform; +import io.druid.java.util.common.logger.Logger; + +/** + * Native I/O operations in order to minimize cache impact. + */ +public class NativeIO +{ + private static final Logger log = new Logger(NativeIO.class); + + private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */ + private static Field field; + + private static volatile boolean initialized = false; + private static volatile boolean fadvisePossible = true; + + static { + field = getFieldByReflection(FileDescriptor.class, "fd"); + + try { + Native.register(Platform.C_LIBRARY_NAME); + initialized = true; + } + catch (NoClassDefFoundError e) { + log.info("JNA not found. Native methods will be disabled."); + } + catch (UnsatisfiedLinkError e) { + log.info("Unable to link C library. Native methods will be disabled."); + } + catch (NoSuchMethodError e) { + log.warn("Obsolete version of JNA present; unable to register C library"); + } + } + + private static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException; + + private NativeIO() {} + + private static Field getFieldByReflection(Class cls, String fieldName) + { + Field field = null; + + try { + field = cls.getDeclaredField(fieldName); + field.setAccessible(true); + } + catch (Exception e) { + log.warn("Unable to read [%s] field from [%s]", fieldName, cls.getName()); + } + + return field; + } + /** + * Get system file descriptor (int) from FileDescriptor object. + * @param descriptor - FileDescriptor object to get fd from + * @return file descriptor, -1 or error + */ + public static int getfd(FileDescriptor descriptor) + { + try { + return field.getInt(descriptor); + } + catch (IllegalArgumentException | IllegalAccessException e) { + log.warn("Unable to read fd field from java.io.FileDescriptor"); + } + + return -1; + } + + /** + * Remove pages from the file system page cache when they wont + * be accessed again + * + * @param fd The file descriptor of the source file. + * @param offset The offset within the file. + * @param len The length to be flushed. + */ + + public static void trySkipCache(int fd, long offset, long len) + { + if (!initialized || !fadvisePossible || fd < 0) { + return; + } + try { + // we ignore the return value as this is just best effort to avoid the cache + posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED); + } + catch (UnsupportedOperationException uoe) { + log.warn(uoe, "posix_fadvise is not supported"); + fadvisePossible = false; + } + catch (UnsatisfiedLinkError ule) { + // if JNA is unavailable just skipping Direct I/O + // instance of this class will act like normal RandomAccessFile + log.warn(ule, "Unsatisfied Link error: posix_fadvise failed on file descriptor [%d], offset [%d]", + fd, offset); + fadvisePossible = false; + } + catch (Exception e) { + // This is best effort anyway so lets just log that there was an + // exception and forget + log.warn(e, "Unknown exception: posix_fadvise failed on file descriptor [%d], offset [%d]", + fd, offset); + } + } + + + /** + * Copy from an input stream to a file minimizing cache impact on the destination.. This happens chunk by chunk + * so only at most chunk size will be present in the OS page cache. Posix (Linux, BSD) only. + * + * @param src Source InputStream where to copy from + * @param dest Destination file to copy to + * @throws IOException + */ + public static void chunkedCopy(InputStream src, File dest) throws IOException + { + + final byte[] buf = new byte[8 << 20]; // 8Mb buffer + long offset = 0; + + try ( + final RandomAccessFile raf = new RandomAccessFile(dest, "rwd") + ) { + final int fd = getfd(raf.getFD()); + + for (int numBytes = 0, bytesRead = 0; bytesRead > -1; ) { + bytesRead = src.read(buf, numBytes, buf.length - numBytes); + + if (numBytes >= buf.length || bytesRead == -1) { + raf.write(buf, 0, numBytes); + trySkipCache(fd, offset, numBytes); + offset = raf.getFilePointer(); + numBytes = 0; + } + + numBytes += bytesRead; + } + } + } + + @VisibleForTesting + static void setFadvisePossible(boolean setting) + { + fadvisePossible = setting; + } + + @VisibleForTesting + static boolean getFadvisePossible() + { + return fadvisePossible; + } +} diff --git a/java-util/src/test/java/io/druid/java/util/common/io/NativeIOTest.java b/java-util/src/test/java/io/druid/java/util/common/io/NativeIOTest.java new file mode 100644 index 00000000000..e1f8c579cf5 --- /dev/null +++ b/java-util/src/test/java/io/druid/java/util/common/io/NativeIOTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.io; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; + +public class NativeIOTest +{ + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testChunkedCopy() throws Exception + { + File f = tempFolder.newFile(); + byte[] bytes = new byte[]{(byte) 0x8, (byte) 0x9}; + + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + NativeIO.chunkedCopy(bis, f); + + byte[] data = Files.readAllBytes(f.toPath()); + Assert.assertTrue(Arrays.equals(bytes, data)); + } + + @Test(expected = IOException.class) + public void testException() throws Exception + { + File dir = tempFolder.newFolder(); + NativeIO.chunkedCopy(null, dir); + } + + @Test + public void testDisabledFadviseChunkedCopy() throws Exception + { + boolean possible = NativeIO.getFadvisePossible(); + + NativeIO.setFadvisePossible(false); + File f = tempFolder.newFile(); + byte[] bytes = new byte[]{(byte) 0x8, (byte) 0x9}; + + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + NativeIO.chunkedCopy(bis, f); + + byte[] data = Files.readAllBytes(f.toPath()); + + NativeIO.setFadvisePossible(possible); + Assert.assertTrue(Arrays.equals(bytes, data)); + } + +} diff --git a/pom.xml b/pom.xml index b58adab336a..77b17ff454d 100644 --- a/pom.xml +++ b/pom.xml @@ -733,6 +733,11 @@ jvm-attach-api 1.2 + + net.java.dev.jna + jna + 4.5.1 +