From ba50a36a3ead628c3d44d384f7ed4d2b3a55dd07 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Thu, 14 Mar 2019 22:21:08 -0700 Subject: [PATCH] HDFS-14354: Refactor MappableBlock to align with the implementation of SCM cache. Contributed by Feilong He. --- .../fsdataset/impl/FsDatasetCache.java | 15 +- .../fsdataset/impl/MappableBlock.java | 155 +----------------- .../fsdataset/impl/MappableBlockLoader.java | 80 +++++++++ .../impl/MemoryMappableBlockLoader.java | 142 ++++++++++++++++ .../fsdataset/impl/MemoryMappedBlock.java | 54 ++++++ 5 files changed, 290 insertions(+), 156 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 767b150e1ff..9efd11a2635 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -227,6 +227,8 @@ public class FsDatasetCache { */ private final long maxBytes; + private final MappableBlockLoader mappableBlockLoader; + /** * Number of cache commands that could not be completed successfully */ @@ -236,7 +238,7 @@ public class FsDatasetCache { */ final AtomicLong numBlocksFailedToUncache = new AtomicLong(0); - public FsDatasetCache(FsDatasetImpl dataset) { + public FsDatasetCache(FsDatasetImpl dataset) throws IOException { this.dataset = dataset; this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); ThreadFactory workerFactory = new ThreadFactoryBuilder() @@ -268,6 +270,7 @@ public class FsDatasetCache { ". Reconfigure this to " + minRevocationPollingMs); } this.revocationPollingMs = confRevocationPollingMs; + this.mappableBlockLoader = new MemoryMappableBlockLoader(); } /** @@ -461,14 +464,14 @@ public class FsDatasetCache { return; } try { - mappableBlock = MappableBlock. - load(length, blockIn, metaIn, blockFileName); + mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn, + blockFileName, key); } catch (ChecksumException e) { // Exception message is bogus since this wasn't caused by a file read LOG.warn("Failed to cache " + key + ": checksum verification failed."); return; } catch (IOException e) { - LOG.warn("Failed to cache " + key, e); + LOG.warn("Failed to cache the block [key=" + key + "]!", e); return; } synchronized (FsDatasetCache.this) { @@ -498,9 +501,7 @@ public class FsDatasetCache { } LOG.debug("Caching of {} was aborted. We are now caching only {} " + "bytes in total.", key, usedBytesCount.get()); - if (mappableBlock != null) { - mappableBlock.close(); - } + IOUtils.closeQuietly(mappableBlock); numBlocksFailedToCache.incrementAndGet(); synchronized (FsDatasetCache.this) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index 45aa364bf8d..0fff32741c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -18,164 +18,21 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import java.io.BufferedInputStream; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileChannel.MapMode; - -import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.util.DataChecksum; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.Closeable; /** - * Represents an HDFS block that is mmapped by the DataNode. + * Represents an HDFS block that is mapped by the DataNode. */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class MappableBlock implements Closeable { - private MappedByteBuffer mmap; - private final long length; - - MappableBlock(MappedByteBuffer mmap, long length) { - this.mmap = mmap; - this.length = length; - assert length > 0; - } - - public long getLength() { - return length; - } +public interface MappableBlock extends Closeable { /** - * Load the block. - * - * mmap and mlock the block, and then verify its checksum. - * - * @param length The current length of the block. - * @param blockIn The block input stream. Should be positioned at the - * start. The caller must close this. - * @param metaIn The meta file input stream. Should be positioned at - * the start. The caller must close this. - * @param blockFileName The block file name, for logging purposes. - * - * @return The Mappable block. + * Get the number of bytes that have been cached. + * @return the number of bytes that have been cached. */ - public static MappableBlock load(long length, - FileInputStream blockIn, FileInputStream metaIn, - String blockFileName) throws IOException { - MappableBlock mappableBlock = null; - MappedByteBuffer mmap = null; - FileChannel blockChannel = null; - try { - blockChannel = blockIn.getChannel(); - if (blockChannel == null) { - throw new IOException("Block InputStream has no FileChannel."); - } - mmap = blockChannel.map(MapMode.READ_ONLY, 0, length); - NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length); - verifyChecksum(length, metaIn, blockChannel, blockFileName); - mappableBlock = new MappableBlock(mmap, length); - } finally { - IOUtils.closeQuietly(blockChannel); - if (mappableBlock == null) { - if (mmap != null) { - NativeIO.POSIX.munmap(mmap); // unmapping also unlocks - } - } - } - return mappableBlock; - } - - /** - * Verifies the block's checksum. This is an I/O intensive operation. - */ - private static void verifyChecksum(long length, - FileInputStream metaIn, FileChannel blockChannel, String blockFileName) - throws IOException, ChecksumException { - // Verify the checksum from the block's meta file - // Get the DataChecksum from the meta file header - BlockMetadataHeader header = - BlockMetadataHeader.readHeader(new DataInputStream( - new BufferedInputStream(metaIn, BlockMetadataHeader - .getHeaderSize()))); - FileChannel metaChannel = null; - try { - metaChannel = metaIn.getChannel(); - if (metaChannel == null) { - throw new IOException("Block InputStream meta file has no FileChannel."); - } - DataChecksum checksum = header.getChecksum(); - final int bytesPerChecksum = checksum.getBytesPerChecksum(); - final int checksumSize = checksum.getChecksumSize(); - final int numChunks = (8*1024*1024) / bytesPerChecksum; - ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); - ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); - // Verify the checksum - int bytesVerified = 0; - while (bytesVerified < length) { - Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, - "Unexpected partial chunk before EOF"); - assert bytesVerified % bytesPerChecksum == 0; - int bytesRead = fillBuffer(blockChannel, blockBuf); - if (bytesRead == -1) { - throw new IOException("checksum verification failed: premature EOF"); - } - blockBuf.flip(); - // Number of read chunks, including partial chunk at end - int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; - checksumBuf.limit(chunks*checksumSize); - fillBuffer(metaChannel, checksumBuf); - checksumBuf.flip(); - checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, - bytesVerified); - // Success - bytesVerified += bytesRead; - blockBuf.clear(); - checksumBuf.clear(); - } - } finally { - IOUtils.closeQuietly(metaChannel); - } - } - - /** - * Reads bytes into a buffer until EOF or the buffer's limit is reached - */ - private static int fillBuffer(FileChannel channel, ByteBuffer buf) - throws IOException { - int bytesRead = channel.read(buf); - if (bytesRead < 0) { - //EOF - return bytesRead; - } - while (buf.remaining() > 0) { - int n = channel.read(buf); - if (n < 0) { - //EOF - return bytesRead; - } - bytesRead += n; - } - return bytesRead; - } - - @Override - public void close() { - if (mmap != null) { - NativeIO.POSIX.munmap(mmap); - mmap = null; - } - } + long getLength(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java new file mode 100644 index 00000000000..a323f78394e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.ExtendedBlockId; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** + * Maps block to DataNode cache region. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class MappableBlockLoader { + + /** + * Load the block. + * + * Map the block, and then verify its checksum. + * + * @param length The current length of the block. + * @param blockIn The block input stream. Should be positioned at the + * start. The caller must close this. + * @param metaIn The meta file input stream. Should be positioned at + * the start. The caller must close this. + * @param blockFileName The block file name, for logging purposes. + * @param key The extended block ID. + * + * @throws IOException If mapping block to cache region fails or checksum + * fails. + * + * @return The Mappable block. + */ + abstract MappableBlock load(long length, FileInputStream blockIn, + FileInputStream metaIn, String blockFileName, + ExtendedBlockId key) + throws IOException; + + /** + * Reads bytes into a buffer until EOF or the buffer's limit is reached. + */ + protected int fillBuffer(FileChannel channel, ByteBuffer buf) + throws IOException { + int bytesRead = channel.read(buf); + if (bytesRead < 0) { + //EOF + return bytesRead; + } + while (buf.remaining() > 0) { + int n = channel.read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java new file mode 100644 index 00000000000..7a0f7c75660 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.DataChecksum; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +/** + * Maps block to memory. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class MemoryMappableBlockLoader extends MappableBlockLoader { + + /** + * Load the block. + * + * mmap and mlock the block, and then verify its checksum. + * + * @param length The current length of the block. + * @param blockIn The block input stream. Should be positioned at the + * start. The caller must close this. + * @param metaIn The meta file input stream. Should be positioned at + * the start. The caller must close this. + * @param blockFileName The block file name, for logging purposes. + * @param key The extended block ID. + * + * @throws IOException If mapping block to memory fails or checksum fails. + + * @return The Mappable block. + */ + @Override + public MappableBlock load(long length, FileInputStream blockIn, + FileInputStream metaIn, String blockFileName, + ExtendedBlockId key) + throws IOException { + MemoryMappedBlock mappableBlock = null; + MappedByteBuffer mmap = null; + FileChannel blockChannel = null; + try { + blockChannel = blockIn.getChannel(); + if (blockChannel == null) { + throw new IOException("Block InputStream has no FileChannel."); + } + mmap = blockChannel.map(FileChannel.MapMode.READ_ONLY, 0, length); + NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length); + verifyChecksum(length, metaIn, blockChannel, blockFileName); + mappableBlock = new MemoryMappedBlock(mmap, length); + } finally { + IOUtils.closeQuietly(blockChannel); + if (mappableBlock == null) { + if (mmap != null) { + NativeIO.POSIX.munmap(mmap); // unmapping also unlocks + } + } + } + return mappableBlock; + } + + /** + * Verifies the block's checksum. This is an I/O intensive operation. + */ + public void verifyChecksum(long length, FileInputStream metaIn, + FileChannel blockChannel, String blockFileName) + throws IOException { + // Verify the checksum from the block's meta file + // Get the DataChecksum from the meta file header + BlockMetadataHeader header = + BlockMetadataHeader.readHeader(new DataInputStream( + new BufferedInputStream(metaIn, BlockMetadataHeader + .getHeaderSize()))); + FileChannel metaChannel = null; + try { + metaChannel = metaIn.getChannel(); + if (metaChannel == null) { + throw new IOException( + "Block InputStream meta file has no FileChannel."); + } + DataChecksum checksum = header.getChecksum(); + final int bytesPerChecksum = checksum.getBytesPerChecksum(); + final int checksumSize = checksum.getChecksumSize(); + final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum; + ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum); + ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize); + // Verify the checksum + int bytesVerified = 0; + while (bytesVerified < length) { + Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, + "Unexpected partial chunk before EOF"); + assert bytesVerified % bytesPerChecksum == 0; + int bytesRead = fillBuffer(blockChannel, blockBuf); + if (bytesRead == -1) { + throw new IOException("checksum verification failed: premature EOF"); + } + blockBuf.flip(); + // Number of read chunks, including partial chunk at end + int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuf.limit(chunks * checksumSize); + fillBuffer(metaChannel, checksumBuf); + checksumBuf.flip(); + checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, + bytesVerified); + // Success + bytesVerified += bytesRead; + blockBuf.clear(); + checksumBuf.clear(); + } + } finally { + IOUtils.closeQuietly(metaChannel); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java new file mode 100644 index 00000000000..c09ad1a5887 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.nio.MappedByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.nativeio.NativeIO; + +/** + * Represents an HDFS block that is mapped to memory by the DataNode. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class MemoryMappedBlock implements MappableBlock { + private MappedByteBuffer mmap; + private final long length; + + MemoryMappedBlock(MappedByteBuffer mmap, long length) { + this.mmap = mmap; + this.length = length; + assert length > 0; + } + + @Override + public long getLength() { + return length; + } + + @Override + public void close() { + if (mmap != null) { + NativeIO.POSIX.munmap(mmap); + mmap = null; + } + } +}