HDFS-7696. In FsDatasetImpl, the getBlockInputStream(..) and getTmpInputStreams(..) methods may leak file descriptors.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
This commit is contained in:
Tsz-Wo Nicholas Sze 2015-02-02 13:38:36 -08:00
parent a5568a276d
commit 8426c7d806
3 changed files with 29 additions and 28 deletions

View File

@ -542,6 +542,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7423. various typos and message formatting fixes in nfs daemon and HDFS-7423. various typos and message formatting fixes in nfs daemon and
doc. (Charles Lamb via yliu) doc. (Charles Lamb via yliu)
HDFS-7696. In FsDatasetImpl, the getBlockInputStream(..) and
getTmpInputStreams(..) methods may leak file descriptors. (szetszwo)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable; import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.InputStream; import java.io.InputStream;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;

View File

@ -47,15 +47,13 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.StandardMBean; import javax.management.StandardMBean;
import com.google.common.collect.Lists;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -76,8 +74,8 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
@ -93,7 +91,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -111,6 +109,9 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/************************************************** /**************************************************
* FSDataset manages a set of data blocks. Each block * FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk. * has a unique name and an extent on disk.
@ -573,18 +574,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (isNativeIOAvailable) { if (isNativeIOAvailable) {
return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
} else { } else {
RandomAccessFile blockInFile;
try { try {
blockInFile = new RandomAccessFile(blockFile, "r"); return openAndSeek(blockFile, seekOffset);
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
throw new IOException("Block " + b + " is not valid. " + throw new IOException("Block " + b + " is not valid. " +
"Expected block file at " + blockFile + " does not exist."); "Expected block file at " + blockFile + " does not exist.");
} }
if (seekOffset > 0) {
blockInFile.seek(seekOffset);
}
return new FileInputStream(blockInFile.getFD());
} }
} }
@ -629,24 +624,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Returns handles to the block file and its metadata file * Returns handles to the block file and its metadata file
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long ckoff) throws IOException { long blkOffset, long metaOffset) throws IOException {
ReplicaInfo info = getReplicaInfo(b); ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference(); FsVolumeReference ref = info.getVolume().obtainReference();
try { try {
File blockFile = info.getBlockFile(); InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
blockInFile.seek(blkOffset);
}
File metaFile = info.getMetaFile();
RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
if (ckoff > 0) {
metaInFile.seek(ckoff);
}
InputStream blockInStream = new FileInputStream(blockInFile.getFD());
try { try {
InputStream metaInStream = new FileInputStream(metaInFile.getFD()); InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
return new ReplicaInputStreams(blockInStream, metaInStream, ref); return new ReplicaInputStreams(blockInStream, metaInStream, ref);
} catch (IOException e) { } catch (IOException e) {
IOUtils.cleanup(null, blockInStream); IOUtils.cleanup(null, blockInStream);
@ -658,6 +643,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
private static FileInputStream openAndSeek(File file, long offset)
throws IOException {
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(file, "r");
if (offset > 0) {
raf.seek(offset);
}
return new FileInputStream(raf.getFD());
} catch(IOException ioe) {
IOUtils.cleanup(null, raf);
throw ioe;
}
}
static File moveBlockFiles(Block b, File srcfile, File destdir) static File moveBlockFiles(Block b, File srcfile, File destdir)
throws IOException { throws IOException {
final File dstfile = new File(destdir, b.getBlockName()); final File dstfile = new File(destdir, b.getBlockName());