From f982f9662322dbe32e1d5a703f0fabc8720815e2 Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Sun, 26 May 2019 14:30:11 +0530 Subject: [PATCH] HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He. (cherry picked from commit 37900c5639f8ba8d41b9fedc3d41ee0fbda7d5db) --- .../fsdataset/impl/MappableBlockLoader.java | 59 ++++++++++ .../impl/MemoryMappableBlockLoader.java | 59 ---------- .../impl/PmemMappableBlockLoader.java | 110 +++--------------- 3 files changed, 75 insertions(+), 153 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java index 044e5c59273..3ec84164c87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java @@ -18,10 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.base.Preconditions; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.util.DataChecksum; +import java.io.BufferedInputStream; +import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -107,6 +113,59 @@ public abstract class MappableBlockLoader { // Do nothing. } + /** + * Verifies the block's checksum. This is an I/O intensive operation. + */ + protected void verifyChecksum(long length, FileInputStream metaIn, + FileChannel blockChannel, String blockFileName) + throws IOException { + // Verify the checksum from the block's meta file + // Get the DataChecksum from the meta file header + BlockMetadataHeader header = + BlockMetadataHeader.readHeader(new DataInputStream( + new BufferedInputStream(metaIn, BlockMetadataHeader + .getHeaderSize()))); + FileChannel metaChannel = null; + try { + metaChannel = metaIn.getChannel(); + if (metaChannel == null) { + throw new IOException( + "Block InputStream meta file has no FileChannel."); + } + DataChecksum checksum = header.getChecksum(); + final int bytesPerChecksum = checksum.getBytesPerChecksum(); + final int checksumSize = checksum.getChecksumSize(); + final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum; + ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum); + ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize); + // Verify the checksum + int bytesVerified = 0; + while (bytesVerified < length) { + Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, + "Unexpected partial chunk before EOF"); + assert bytesVerified % bytesPerChecksum == 0; + int bytesRead = fillBuffer(blockChannel, blockBuf); + if (bytesRead == -1) { + throw new IOException("checksum verification failed: premature EOF"); + } + blockBuf.flip(); + // Number of read chunks, including partial chunk at end + int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuf.limit(chunks * checksumSize); + fillBuffer(metaChannel, checksumBuf); + checksumBuf.flip(); + checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, + bytesVerified); + // Success + bytesVerified += bytesRead; + blockBuf.clear(); + checksumBuf.clear(); + } + } finally { + IOUtils.closeQuietly(metaChannel); + } + } + /** * Reads bytes into a buffer until EOF or the buffer's limit is reached. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java index 919835a5ee2..52d8d931c04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java @@ -18,22 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -98,59 +92,6 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader { return mappableBlock; } - /** - * Verifies the block's checksum. This is an I/O intensive operation. - */ - private void verifyChecksum(long length, FileInputStream metaIn, - FileChannel blockChannel, String blockFileName) - throws IOException { - // Verify the checksum from the block's meta file - // Get the DataChecksum from the meta file header - BlockMetadataHeader header = - BlockMetadataHeader.readHeader(new DataInputStream( - new BufferedInputStream(metaIn, BlockMetadataHeader - .getHeaderSize()))); - FileChannel metaChannel = null; - try { - metaChannel = metaIn.getChannel(); - if (metaChannel == null) { - throw new IOException( - "Block InputStream meta file has no FileChannel."); - } - DataChecksum checksum = header.getChecksum(); - final int bytesPerChecksum = checksum.getBytesPerChecksum(); - final int checksumSize = checksum.getChecksumSize(); - final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum; - ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum); - ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize); - // Verify the checksum - int bytesVerified = 0; - while (bytesVerified < length) { - Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, - "Unexpected partial chunk before EOF"); - assert bytesVerified % bytesPerChecksum == 0; - int bytesRead = fillBuffer(blockChannel, blockBuf); - if (bytesRead == -1) { - throw new IOException("checksum verification failed: premature EOF"); - } - blockBuf.flip(); - // Number of read chunks, including partial chunk at end - int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum; - checksumBuf.limit(chunks * checksumSize); - fillBuffer(metaChannel, checksumBuf); - checksumBuf.flip(); - checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, - bytesVerified); - // Success - bytesVerified += bytesRead; - blockBuf.clear(); - checksumBuf.clear(); - } - } finally { - IOUtils.closeQuietly(metaChannel); - } - } - @Override public long getCacheUsed() { return memCacheStats.getCacheUsed(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java index 05a9ba717e2..239fff815b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java @@ -18,25 +18,17 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DNConf; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; /** @@ -79,112 +71,42 @@ public class PmemMappableBlockLoader extends MappableBlockLoader { */ @Override MappableBlock load(long length, FileInputStream blockIn, - FileInputStream metaIn, String blockFileName, - ExtendedBlockId key) + FileInputStream metaIn, String blockFileName, + ExtendedBlockId key) throws IOException { PmemMappedBlock mappableBlock = null; - String filePath = null; + String cachePath = null; FileChannel blockChannel = null; - RandomAccessFile file = null; - MappedByteBuffer out = null; + RandomAccessFile cacheFile = null; try { blockChannel = blockIn.getChannel(); if (blockChannel == null) { throw new IOException("Block InputStream has no FileChannel."); } + cachePath = pmemVolumeManager.getCachePath(key); + cacheFile = new RandomAccessFile(cachePath, "rw"); + blockChannel.transferTo(0, length, cacheFile.getChannel()); + + // Verify checksum for the cached data instead of block file. + // The file channel should be repositioned. + cacheFile.getChannel().position(0); + verifyChecksum(length, metaIn, cacheFile.getChannel(), blockFileName); - filePath = pmemVolumeManager.getCachePath(key); - file = new RandomAccessFile(filePath, "rw"); - out = file.getChannel(). - map(FileChannel.MapMode.READ_WRITE, 0, length); - if (out == null) { - throw new IOException("Failed to map the block " + blockFileName + - " to persistent storage."); - } - verifyChecksumAndMapBlock(out, length, metaIn, blockChannel, - blockFileName); mappableBlock = new PmemMappedBlock(length, key); LOG.info("Successfully cached one replica:{} into persistent memory" - + ", [cached path={}, length={}]", key, filePath, length); + + ", [cached path={}, length={}]", key, cachePath, length); } finally { IOUtils.closeQuietly(blockChannel); - if (out != null) { - NativeIO.POSIX.munmap(out); - } - IOUtils.closeQuietly(file); + IOUtils.closeQuietly(cacheFile); if (mappableBlock == null) { - LOG.debug("Delete {} due to unsuccessful mapping.", filePath); - FsDatasetUtil.deleteMappedFile(filePath); + LOG.debug("Delete {} due to unsuccessful mapping.", cachePath); + FsDatasetUtil.deleteMappedFile(cachePath); } } return mappableBlock; } - /** - * Verifies the block's checksum meanwhile maps block to persistent memory. - * This is an I/O intensive operation. - */ - private void verifyChecksumAndMapBlock( - MappedByteBuffer out, long length, FileInputStream metaIn, - FileChannel blockChannel, String blockFileName) - throws IOException { - // Verify the checksum from the block's meta file - // Get the DataChecksum from the meta file header - BlockMetadataHeader header = - BlockMetadataHeader.readHeader(new DataInputStream( - new BufferedInputStream(metaIn, BlockMetadataHeader - .getHeaderSize()))); - FileChannel metaChannel = null; - try { - metaChannel = metaIn.getChannel(); - if (metaChannel == null) { - throw new IOException("Cannot get FileChannel from " + - "Block InputStream meta file."); - } - DataChecksum checksum = header.getChecksum(); - final int bytesPerChecksum = checksum.getBytesPerChecksum(); - final int checksumSize = checksum.getChecksumSize(); - final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum; - ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum); - ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize); - // Verify the checksum - int bytesVerified = 0; - while (bytesVerified < length) { - Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, - "Unexpected partial chunk before EOF"); - assert bytesVerified % bytesPerChecksum == 0; - int bytesRead = fillBuffer(blockChannel, blockBuf); - if (bytesRead == -1) { - throw new IOException( - "Checksum verification failed for the block " + blockFileName + - ": premature EOF"); - } - blockBuf.flip(); - // Number of read chunks, including partial chunk at end - int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum; - checksumBuf.limit(chunks * checksumSize); - fillBuffer(metaChannel, checksumBuf); - checksumBuf.flip(); - checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, - bytesVerified); - - // / Copy data to persistent file - out.put(blockBuf); - // positioning the - bytesVerified += bytesRead; - - // Clear buffer - blockBuf.clear(); - checksumBuf.clear(); - } - // Forces to write data to storage device containing the mapped file - out.force(); - } finally { - IOUtils.closeQuietly(metaChannel); - } - } - @Override public long getCacheUsed() { return pmemVolumeManager.getCacheUsed();