From 129bf91c5e35807eb5624d8974681d9d700ea16b Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Wed, 9 Apr 2014 21:58:01 +0000 Subject: [PATCH] HDFS-6208. Merging change r1586154 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1586160 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../fsdataset/impl/FsDatasetCache.java | 4 +- .../fsdataset/impl/MappableBlock.java | 72 ++++++++++--------- .../server/namenode/TestCacheDirectives.java | 6 ++ 4 files changed, 50 insertions(+), 34 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ed720b390f6..5f4397a3908 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -111,6 +111,8 @@ Release 2.4.1 - UNRELEASED HDFS-6209. TestValidateConfigurationSettings should use random ports. (Arpit Agarwal via szetszwo) + HDFS-6208. DataNode caching can leak file descriptors. (cnauroth) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 975b836cbdd..f1c60a3069e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -395,6 +395,8 @@ public class FsDatasetCache { dataset.datanode.getMetrics().incrBlocksCached(1); success = true; } finally { + IOUtils.closeQuietly(blockIn); + IOUtils.closeQuietly(metaIn); if (!success) { if (reservedBytes) { newUsedBytes = usedBytesCount.release(length); @@ -403,8 +405,6 @@ public class FsDatasetCache { LOG.debug("Caching of " + key + " was aborted. We are now " + "caching only " + newUsedBytes + " + bytes in total."); } - IOUtils.closeQuietly(blockIn); - IOUtils.closeQuietly(metaIn); if (mappableBlock != null) { mappableBlock.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index ae36822efae..948c694faf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -28,6 +28,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; @@ -76,8 +77,9 @@ public class MappableBlock implements Closeable { String blockFileName) throws IOException { MappableBlock mappableBlock = null; MappedByteBuffer mmap = null; + FileChannel blockChannel = null; try { - FileChannel blockChannel = blockIn.getChannel(); + blockChannel = blockIn.getChannel(); if (blockChannel == null) { throw new IOException("Block InputStream has no FileChannel."); } @@ -86,6 +88,7 @@ public class MappableBlock implements Closeable { verifyChecksum(length, metaIn, blockChannel, blockFileName); mappableBlock = new MappableBlock(mmap, length); } finally { + IOUtils.closeQuietly(blockChannel); if (mappableBlock == null) { if (mmap != null) { NativeIO.POSIX.munmap(mmap); // unmapping also unlocks @@ -108,38 +111,43 @@ public class MappableBlock implements Closeable { BlockMetadataHeader.readHeader(new DataInputStream( new BufferedInputStream(metaIn, BlockMetadataHeader .getHeaderSize()))); - FileChannel metaChannel = metaIn.getChannel(); - if (metaChannel == null) { - throw new IOException("Block InputStream meta file has no FileChannel."); - } - DataChecksum checksum = header.getChecksum(); - final int bytesPerChecksum = checksum.getBytesPerChecksum(); - final int checksumSize = checksum.getChecksumSize(); - final int numChunks = (8*1024*1024) / bytesPerChecksum; - ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); - ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); - // Verify the checksum - int bytesVerified = 0; - while (bytesVerified < length) { - Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, - "Unexpected partial chunk before EOF"); - assert bytesVerified % bytesPerChecksum == 0; - int bytesRead = fillBuffer(blockChannel, blockBuf); - if (bytesRead == -1) { - throw new IOException("checksum verification failed: premature EOF"); + FileChannel metaChannel = null; + try { + metaChannel = metaIn.getChannel(); + if (metaChannel == null) { + throw new IOException("Block InputStream meta file has no FileChannel."); } - blockBuf.flip(); - // Number of read chunks, including partial chunk at end - int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; - checksumBuf.limit(chunks*checksumSize); - fillBuffer(metaChannel, checksumBuf); - checksumBuf.flip(); - checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, - bytesVerified); - // Success - bytesVerified += bytesRead; - blockBuf.clear(); - checksumBuf.clear(); + DataChecksum checksum = header.getChecksum(); + final int bytesPerChecksum = checksum.getBytesPerChecksum(); + final int checksumSize = checksum.getChecksumSize(); + final int numChunks = (8*1024*1024) / bytesPerChecksum; + ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum); + ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize); + // Verify the checksum + int bytesVerified = 0; + while (bytesVerified < length) { + Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, + "Unexpected partial chunk before EOF"); + assert bytesVerified % bytesPerChecksum == 0; + int bytesRead = fillBuffer(blockChannel, blockBuf); + if (bytesRead == -1) { + throw new IOException("checksum verification failed: premature EOF"); + } + blockBuf.flip(); + // Number of read chunks, including partial chunk at end + int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; + checksumBuf.limit(chunks*checksumSize); + fillBuffer(metaChannel, checksumBuf); + checksumBuf.flip(); + checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, + bytesVerified); + // Success + bytesVerified += bytesRead; + blockBuf.clear(); + checksumBuf.clear(); + } + } finally { + IOUtils.closeQuietly(metaChannel); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 4ac7601cbaa..ecc738aadcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -147,6 +147,12 @@ public class TestCacheDirectives { @After public void teardown() throws Exception { + // Remove cache directives left behind by tests so that we release mmaps. + RemoteIterator iter = dfs.listCacheDirectives(null); + while (iter.hasNext()) { + dfs.removeCacheDirective(iter.next().getInfo().getId()); + } + waitForCachedBlocks(namenode, 0, 0, "teardown"); if (cluster != null) { cluster.shutdown(); }