HDFS-6208. DataNode caching can leak file descriptors. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1586154 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-04-09 21:45:57 +00:00
parent 8af0708580
commit 5c48f379ab
4 changed files with 50 additions and 34 deletions

View File

@ -359,6 +359,8 @@ Release 2.4.1 - UNRELEASED
HDFS-6209. TestValidateConfigurationSettings should use random ports. HDFS-6209. TestValidateConfigurationSettings should use random ports.
(Arpit Agarwal via szetszwo) (Arpit Agarwal via szetszwo)
HDFS-6208. DataNode caching can leak file descriptors. (cnauroth)
Release 2.4.0 - 2014-04-07 Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -395,6 +395,8 @@ public class FsDatasetCache {
dataset.datanode.getMetrics().incrBlocksCached(1); dataset.datanode.getMetrics().incrBlocksCached(1);
success = true; success = true;
} finally { } finally {
IOUtils.closeQuietly(blockIn);
IOUtils.closeQuietly(metaIn);
if (!success) { if (!success) {
if (reservedBytes) { if (reservedBytes) {
newUsedBytes = usedBytesCount.release(length); newUsedBytes = usedBytesCount.release(length);
@ -403,8 +405,6 @@ public class FsDatasetCache {
LOG.debug("Caching of " + key + " was aborted. We are now " + LOG.debug("Caching of " + key + " was aborted. We are now " +
"caching only " + newUsedBytes + " + bytes in total."); "caching only " + newUsedBytes + " + bytes in total.");
} }
IOUtils.closeQuietly(blockIn);
IOUtils.closeQuietly(metaIn);
if (mappableBlock != null) { if (mappableBlock != null) {
mappableBlock.close(); mappableBlock.close();
} }

View File

@ -28,6 +28,7 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode; import java.nio.channels.FileChannel.MapMode;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
@ -76,8 +77,9 @@ public class MappableBlock implements Closeable {
String blockFileName) throws IOException { String blockFileName) throws IOException {
MappableBlock mappableBlock = null; MappableBlock mappableBlock = null;
MappedByteBuffer mmap = null; MappedByteBuffer mmap = null;
FileChannel blockChannel = null;
try { try {
FileChannel blockChannel = blockIn.getChannel(); blockChannel = blockIn.getChannel();
if (blockChannel == null) { if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel."); throw new IOException("Block InputStream has no FileChannel.");
} }
@ -86,6 +88,7 @@ public class MappableBlock implements Closeable {
verifyChecksum(length, metaIn, blockChannel, blockFileName); verifyChecksum(length, metaIn, blockChannel, blockFileName);
mappableBlock = new MappableBlock(mmap, length); mappableBlock = new MappableBlock(mmap, length);
} finally { } finally {
IOUtils.closeQuietly(blockChannel);
if (mappableBlock == null) { if (mappableBlock == null) {
if (mmap != null) { if (mmap != null) {
NativeIO.POSIX.munmap(mmap); // unmapping also unlocks NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
@ -108,38 +111,43 @@ public class MappableBlock implements Closeable {
BlockMetadataHeader.readHeader(new DataInputStream( BlockMetadataHeader.readHeader(new DataInputStream(
new BufferedInputStream(metaIn, BlockMetadataHeader new BufferedInputStream(metaIn, BlockMetadataHeader
.getHeaderSize()))); .getHeaderSize())));
FileChannel metaChannel = metaIn.getChannel(); FileChannel metaChannel = null;
if (metaChannel == null) { try {
throw new IOException("Block InputStream meta file has no FileChannel."); metaChannel = metaIn.getChannel();
} if (metaChannel == null) {
DataChecksum checksum = header.getChecksum(); throw new IOException("Block InputStream meta file has no FileChannel.");
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
final int numChunks = (8*1024*1024) / bytesPerChecksum;
ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
// Verify the checksum
int bytesVerified = 0;
while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
"Unexpected partial chunk before EOF");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
throw new IOException("checksum verification failed: premature EOF");
} }
blockBuf.flip(); DataChecksum checksum = header.getChecksum();
// Number of read chunks, including partial chunk at end final int bytesPerChecksum = checksum.getBytesPerChecksum();
int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; final int checksumSize = checksum.getChecksumSize();
checksumBuf.limit(chunks*checksumSize); final int numChunks = (8*1024*1024) / bytesPerChecksum;
fillBuffer(metaChannel, checksumBuf); ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
checksumBuf.flip(); ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, // Verify the checksum
bytesVerified); int bytesVerified = 0;
// Success while (bytesVerified < length) {
bytesVerified += bytesRead; Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
blockBuf.clear(); "Unexpected partial chunk before EOF");
checksumBuf.clear(); assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
throw new IOException("checksum verification failed: premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
checksumBuf.limit(chunks*checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
// Success
bytesVerified += bytesRead;
blockBuf.clear();
checksumBuf.clear();
}
} finally {
IOUtils.closeQuietly(metaChannel);
} }
} }

View File

@ -147,6 +147,12 @@ public class TestCacheDirectives {
@After @After
public void teardown() throws Exception { public void teardown() throws Exception {
// Remove cache directives left behind by tests so that we release mmaps.
RemoteIterator<CacheDirectiveEntry> iter = dfs.listCacheDirectives(null);
while (iter.hasNext()) {
dfs.removeCacheDirective(iter.next().getInfo().getId());
}
waitForCachedBlocks(namenode, 0, 0, "teardown");
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }