diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b671362e218..d3e5ed200ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -141,6 +141,8 @@ Release 0.23.3 - UNRELEASED HDFS-3057. httpfs and hdfs launcher scripts should honor CATALINA_HOME and HADOOP_LIBEXEC_DIR (rvs via tucu) + HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index d4cd2fe829f..fdc78c9b9a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -72,7 +72,7 @@ class BlockPoolSliceScanner { private final AtomicLong lastScanTime = new AtomicLong(); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FSDatasetInterface dataset; private final SortedSet blockInfoSet = new TreeSet(); @@ -134,7 +134,7 @@ public int compareTo(BlockScanInfo other) { } BlockPoolSliceScanner(String bpid, DataNode datanode, - FSDatasetInterface dataset, + FSDatasetInterface dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index fd25c1df376..1449b88f8fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -38,12 +38,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -86,7 +86,7 @@ class BlockReceiver implements Closeable { private DataOutputStream mirrorOut; private Daemon responder = null; private DataTransferThrottler throttler; - private FSDataset.BlockWriteStreams streams; + private ReplicaOutputStreams streams; private DatanodeInfo srcDataNode = null; private Checksum partialCrc = null; private final DataNode datanode; @@ -202,16 +202,16 @@ class BlockReceiver implements Closeable { this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.checksumSize = diskChecksum.getChecksumSize(); - this.out = streams.dataOut; + this.out = streams.getDataOut(); if (out instanceof FileOutputStream) { this.outFd = ((FileOutputStream)out).getFD(); } else { LOG.warn("Could not get file descriptor for outputstream of class " + out.getClass()); } - this.cout = streams.checksumOut; + this.cout = streams.getChecksumOut(); this.checksumOut = new DataOutputStream(new BufferedOutputStream( - streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE)); + cout, HdfsConstants.SMALL_BUFFER_SIZE)); // write data chunk header if creating a new replica if (isCreate) { BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); @@ -856,13 +856,13 @@ private void computePartialChunkCrc(long blkoff, long ckoff, // byte[] buf = new byte[sizePartialChunk]; byte[] crcbuf = new byte[checksumSize]; - FSDataset.BlockInputStreams instr = null; + ReplicaInputStreams instr = null; try { instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff); - IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk); + IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier - IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length); + IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); } finally { IOUtils.closeStream(instr); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java index c96be75f125..28c1dc46d70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /************************************************** * BlockVolumeChoosingPolicy allows a DataNode to @@ -33,7 +33,7 @@ * ***************************************************/ @InterfaceAudience.Private -public interface BlockVolumeChoosingPolicy { +public interface BlockVolumeChoosingPolicy { /** * Returns a specific FSVolume after applying a suitable choice algorithm diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java index d52681509a3..63b2464e728 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /** * DataBlockScanner manages block scanning for all the block pools. For each @@ -43,7 +43,7 @@ public class DataBlockScanner implements Runnable { public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FSDatasetInterface dataset; private final Configuration conf; /** @@ -55,7 +55,7 @@ public class DataBlockScanner implements Runnable { Thread blockScannerThread = null; DataBlockScanner(DataNode datanode, - FSDatasetInterface dataset, + FSDatasetInterface dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 59b2882b588..651d2bbd093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -123,8 +123,8 @@ import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets; @@ -235,7 +235,7 @@ public static InetSocketAddress createSocketAddr(String target volatile boolean shouldRun = true; private BlockPoolManager blockPoolManager; - volatile FSDatasetInterface data = null; + volatile FSDatasetInterface data = null; private String clusterId = null; public final static String EMPTY_DEL_HINT = ""; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index fe07754a3c7..da9bc79048f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -55,7 +55,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; @@ -511,7 +511,7 @@ public void blockChecksum(final ExtendedBlock block, checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); updateCurrentThreadName("Reading metadata for block " + block); - final MetaDataInputStream metadataIn = + final LengthInputStream metadataIn = datanode.data.getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 97ff5a8416e..91ce4092d7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.Daemon; /** @@ -157,13 +157,13 @@ static class ScanInfo implements Comparable { private final long blockId; private final File metaFile; private final File blockFile; - private final FSVolumeInterface volume; + private final FsVolumeSpi volume; ScanInfo(long blockId) { this(blockId, null, null, null); } - ScanInfo(long blockId, File blockFile, File metaFile, FSVolumeInterface vol) { + ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) { this.blockId = blockId; this.metaFile = metaFile; this.blockFile = blockFile; @@ -182,7 +182,7 @@ long getBlockId() { return blockId; } - FSVolumeInterface getVolume() { + FsVolumeSpi getVolume() { return volume; } @@ -412,8 +412,8 @@ private void addDifference(LinkedList diffRecord, /** Is the given volume still valid in the dataset? */ private static boolean isValid(final FSDatasetInterface dataset, - final FSVolumeInterface volume) { - for (FSVolumeInterface vol : dataset.getVolumes()) { + final FsVolumeSpi volume) { + for (FsVolumeSpi vol : dataset.getVolumes()) { if (vol == volume) { return true; } @@ -424,7 +424,7 @@ private static boolean isValid(final FSDatasetInterface dataset, /** Get lists of blocks on the disk sorted by blockId, per blockpool */ private Map getDiskReport() { // First get list of data directories - final List volumes = dataset.getVolumes(); + final List volumes = dataset.getVolumes(); ArrayList dirReports = new ArrayList(volumes.size()); @@ -473,9 +473,9 @@ private static boolean isBlockMetaFile(String blockId, String metaFile) { private static class ReportCompiler implements Callable { - private FSVolumeInterface volume; + private FsVolumeSpi volume; - public ReportCompiler(FSVolumeInterface volume) { + public ReportCompiler(FsVolumeSpi volume) { this.volume = volume; } @@ -492,7 +492,7 @@ public ScanInfoPerBlockPool call() throws Exception { } /** Compile list {@link ScanInfo} for the blocks in the directory */ - private LinkedList compileReport(FSVolumeInterface vol, File dir, + private LinkedList compileReport(FsVolumeSpi vol, File dir, LinkedList report) { File[] files; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index 4c7ea40d342..f970e01b5d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -61,7 +61,10 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -548,7 +551,7 @@ public void shutdown() { * * It uses the {@link FSDataset} object for synchronization. */ - static class FSVolume implements FSVolumeInterface { + static class FSVolume implements FsVolumeSpi { private final FSDataset dataset; private final Map map = new HashMap(); private final File currentDir; // /current @@ -865,7 +868,7 @@ private long getCapacity() throws IOException { private long getRemaining() throws IOException { long remaining = 0L; - for (FSVolumeInterface vol : volumes) { + for (FsVolumeSpi vol : volumes) { remaining += vol.getAvailable(); } return remaining; @@ -1052,13 +1055,13 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { } @Override // FSDatasetInterface - public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b) + public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { final File meta = getMetaFile(b); if (meta == null || !meta.exists()) { return null; } - return new MetaDataInputStream(new FileInputStream(meta), meta.length()); + return new LengthInputStream(new FileInputStream(meta), meta.length()); } private final DataNode datanode; @@ -1287,7 +1290,7 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid) * Returns handles to the block file and its metadata file */ @Override // FSDatasetInterface - public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b, + public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); File blockFile = info.getBlockFile(); @@ -1300,7 +1303,7 @@ public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b, if (ckoff > 0) { metaInFile.seek(ckoff); } - return new BlockInputStreams(new FileInputStream(blockInFile.getFD()), + return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()), new FileInputStream(metaInFile.getFD())); } @@ -1742,9 +1745,9 @@ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) * last checksum will be overwritten. */ @Override // FSDatasetInterface - public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams, + public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, int checksumSize) throws IOException { - FileOutputStream file = (FileOutputStream) streams.checksumOut; + FileOutputStream file = (FileOutputStream) streams.getChecksumOut(); FileChannel channel = file.getChannel(); long oldPos = channel.position(); long newPos = oldPos - checksumSize; @@ -2195,7 +2198,7 @@ public String getStorageInfo() { */ @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FSVolumeInterface vol) { + File diskMetaFile, FsVolumeSpi vol) { Block corruptBlock = null; ReplicaInfo memBlockInfo; synchronized (this) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java index 029c2707281..910a1af8ce7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java @@ -18,12 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode; -import java.io.Closeable; import java.io.File; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.List; import java.util.Map; @@ -34,11 +31,13 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.ReflectionUtils; @@ -50,7 +49,7 @@ * */ @InterfaceAudience.Private -public interface FSDatasetInterface +public interface FSDatasetInterface extends FSDatasetMBean { /** * A factory for creating FSDatasetInterface objects. @@ -77,24 +76,6 @@ public boolean isSimulated() { } } - /** - * This is an interface for the underlying volume. - * @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume - */ - interface FSVolumeInterface { - /** @return a list of block pools. */ - public String[] getBlockPoolList(); - - /** @return the available storage space in bytes. */ - public long getAvailable() throws IOException; - - /** @return the path to the volume */ - public String getPath(String bpid) throws IOException; - - /** @return the directory for the finalized blocks in the block pool. */ - public File getFinalizedDir(String bpid) throws IOException; - } - /** * Create rolling logs. * @@ -121,32 +102,15 @@ interface FSVolumeInterface { * as corrupted. */ public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FSVolumeInterface vol); + File diskMetaFile, FsVolumeSpi vol); - /** - * This class provides the input stream and length of the metadata - * of a block - * - */ - static class MetaDataInputStream extends FilterInputStream { - MetaDataInputStream(InputStream stream, long len) { - super(stream); - length = len; - } - private long length; - - public long getLength() { - return length; - } - } - /** * @param b - the block * @return a stream if the meta-data of the block exists; * otherwise, return null. * @throws IOException */ - public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b + public LengthInputStream getMetaDataInputStream(ExtendedBlock b ) throws IOException; /** @@ -197,58 +161,10 @@ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) * starting at the offset * @throws IOException */ - public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException; /** - * - * This class contains the output streams for the data and checksum - * of a block - * - */ - static class BlockWriteStreams { - OutputStream dataOut; - OutputStream checksumOut; - DataChecksum checksum; - - BlockWriteStreams(OutputStream dOut, OutputStream cOut, - DataChecksum checksum) { - dataOut = dOut; - checksumOut = cOut; - this.checksum = checksum; - } - - void close() { - IOUtils.closeStream(dataOut); - IOUtils.closeStream(checksumOut); - } - - DataChecksum getChecksum() { - return checksum; - } - } - - /** - * This class contains the input streams for the data and checksum - * of a block - */ - static class BlockInputStreams implements Closeable { - final InputStream dataIn; - final InputStream checksumIn; - - BlockInputStreams(InputStream dataIn, InputStream checksumIn) { - this.dataIn = dataIn; - this.checksumIn = checksumIn; - } - - @Override - public void close() { - IOUtils.closeStream(dataIn); - IOUtils.closeStream(checksumIn); - } - } - - /** * Creates a temporary replica and returns the meta information of the replica * * @param b block @@ -395,7 +311,7 @@ public void recoverClose(ExtendedBlock b, * @param checksumSize number of bytes each checksum has * @throws IOException */ - public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream, + public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream, int checksumSize) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index 6aa10db2f26..2969b181088 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /** * This class describes a replica that has been finalized. @@ -38,7 +38,7 @@ class FinalizedReplica extends ReplicaInfo { * @param dir directory path where block and meta files are located */ FinalizedReplica(long blockId, long len, long genStamp, - FSVolumeInterface vol, File dir) { + FsVolumeSpi vol, File dir) { super(blockId, len, genStamp, vol, dir); } @@ -48,7 +48,7 @@ class FinalizedReplica extends ReplicaInfo { * @param vol volume where replica is located * @param dir directory path where block and meta files are located */ - FinalizedReplica(Block block, FSVolumeInterface vol, File dir) { + FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { super(block, vol, dir); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java index f6458508cb7..e9427a2ad8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /** This class represents replicas being written. * Those are the replicas that @@ -36,7 +36,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline { * @param dir directory path where block and meta files are located */ ReplicaBeingWritten(long blockId, long genStamp, - FSVolumeInterface vol, File dir) { + FsVolumeSpi vol, File dir) { super( blockId, genStamp, vol, dir); } @@ -48,7 +48,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline { * @param writer a thread that is writing to this replica */ ReplicaBeingWritten(Block block, - FSVolumeInterface vol, File dir, Thread writer) { + FsVolumeSpi vol, File dir, Thread writer) { super( block, vol, dir, writer); } @@ -62,7 +62,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline { * @param writer a thread that is writing to this replica */ ReplicaBeingWritten(long blockId, long len, long genStamp, - FSVolumeInterface vol, File dir, Thread writer ) { + FsVolumeSpi vol, File dir, Thread writer ) { super( blockId, len, genStamp, vol, dir, writer); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 82851c9f47a..b29d1c12b3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -24,8 +24,8 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -53,7 +53,7 @@ class ReplicaInPipeline extends ReplicaInfo * @param state replica state */ ReplicaInPipeline(long blockId, long genStamp, - FSVolumeInterface vol, File dir) { + FsVolumeSpi vol, File dir) { this( blockId, 0L, genStamp, vol, dir, Thread.currentThread()); } @@ -65,7 +65,7 @@ class ReplicaInPipeline extends ReplicaInfo * @param writer a thread that is writing to this replica */ ReplicaInPipeline(Block block, - FSVolumeInterface vol, File dir, Thread writer) { + FsVolumeSpi vol, File dir, Thread writer) { this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), vol, dir, writer); } @@ -80,7 +80,7 @@ class ReplicaInPipeline extends ReplicaInfo * @param writer a thread that is writing to this replica */ ReplicaInPipeline(long blockId, long len, long genStamp, - FSVolumeInterface vol, File dir, Thread writer ) { + FsVolumeSpi vol, File dir, Thread writer ) { super( blockId, len, genStamp, vol, dir); this.bytesAcked = len; this.bytesOnDisk = len; @@ -168,7 +168,7 @@ public int hashCode() { } @Override // ReplicaInPipelineInterface - public BlockWriteStreams createStreams(boolean isCreate, + public ReplicaOutputStreams createStreams(boolean isCreate, DataChecksum requestedChecksum) throws IOException { File blockFile = getBlockFile(); File metaFile = getMetaFile(); @@ -234,7 +234,7 @@ public BlockWriteStreams createStreams(boolean isCreate, blockOut.getChannel().position(blockDiskSize); crcOut.getChannel().position(crcDiskSize); } - return new BlockWriteStreams(blockOut, crcOut, checksum); + return new ReplicaOutputStreams(blockOut, crcOut, checksum); } catch (IOException e) { IOUtils.closeStream(blockOut); IOUtils.closeStream(metaRAF); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java index 17eefa98da2..235b4f6c423 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java @@ -19,7 +19,7 @@ import java.io.IOException; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.util.DataChecksum; /** @@ -66,6 +66,6 @@ interface ReplicaInPipelineInterface extends Replica { * @return output streams for writing * @throws IOException if any error occurs */ - public BlockWriteStreams createStreams(boolean isCreate, + public ReplicaOutputStreams createStreams(boolean isCreate, DataChecksum requestedChecksum) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 65da8c7698e..b77eff19d9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.io.IOUtils; /** @@ -36,7 +36,7 @@ @InterfaceAudience.Private abstract public class ReplicaInfo extends Block implements Replica { /** volume where the replica belongs */ - private FSVolumeInterface volume; + private FsVolumeSpi volume; /** directory where block & meta files belong */ private File dir; @@ -47,7 +47,7 @@ abstract public class ReplicaInfo extends Block implements Replica { * @param vol volume where replica is located * @param dir directory path where block and meta files are located */ - ReplicaInfo(long blockId, long genStamp, FSVolumeInterface vol, File dir) { + ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) { this( blockId, 0L, genStamp, vol, dir); } @@ -57,7 +57,7 @@ abstract public class ReplicaInfo extends Block implements Replica { * @param vol volume where replica is located * @param dir directory path where block and meta files are located */ - ReplicaInfo(Block block, FSVolumeInterface vol, File dir) { + ReplicaInfo(Block block, FsVolumeSpi vol, File dir) { this(block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), vol, dir); } @@ -71,7 +71,7 @@ abstract public class ReplicaInfo extends Block implements Replica { * @param dir directory path where block and meta files are located */ ReplicaInfo(long blockId, long len, long genStamp, - FSVolumeInterface vol, File dir) { + FsVolumeSpi vol, File dir) { super(blockId, len, genStamp); this.volume = vol; this.dir = dir; @@ -113,14 +113,14 @@ File getMetaFile() { * Get the volume where this replica is located on disk * @return the volume where this replica is located on disk */ - FSVolumeInterface getVolume() { + FsVolumeSpi getVolume() { return volume; } /** * Set the volume where this replica is located on disk */ - void setVolume(FSVolumeInterface vol) { + void setVolume(FsVolumeSpi vol) { this.volume = vol; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java index 635bf831b34..2e15e6fce54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java @@ -20,7 +20,7 @@ import java.io.File; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; /** @@ -145,7 +145,7 @@ void setDir(File dir) { } @Override //ReplicaInfo - void setVolume(FSVolumeInterface vol) { + void setVolume(FsVolumeSpi vol) { super.setVolume(vol); original.setVolume(vol); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java index d37a06cdfd2..b26b77ecade 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /** * This class represents a replica that is waiting to be recovered. @@ -44,7 +44,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo { * @param dir directory path where block and meta files are located */ ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp, - FSVolumeInterface vol, File dir) { + FsVolumeSpi vol, File dir) { super(blockId, len, genStamp, vol, dir); } @@ -54,7 +54,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo { * @param vol volume where replica is located * @param dir directory path where block and meta files are located */ - ReplicaWaitingToBeRecovered(Block block, FSVolumeInterface vol, File dir) { + ReplicaWaitingToBeRecovered(Block block, FsVolumeSpi vol, File dir) { super(block, vol, dir); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java index 00fdffab2f7..228ebc06dc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java @@ -20,10 +20,10 @@ import java.io.IOException; import java.util.List; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -public class RoundRobinVolumesPolicy +public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy { private int curVolume = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java new file mode 100644 index 00000000000..87b5ec55f4f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.File; +import java.io.IOException; + +/** + * This is an interface for the underlying volume. + */ +public interface FsVolumeSpi { + /** @return a list of block pools. */ + public String[] getBlockPoolList(); + + /** @return the available storage space in bytes. */ + public long getAvailable() throws IOException; + + /** @return the path to the volume */ + public String getPath(String bpid) throws IOException; + + /** @return the directory for the finalized blocks in the block pool. */ + public File getFinalizedDir(String bpid) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java new file mode 100644 index 00000000000..22ec6dc44a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.FilterInputStream; +import java.io.InputStream; + +/** + * An input stream with length. + */ +public class LengthInputStream extends FilterInputStream { + + private final long length; + + /** + * Create an stream. + * @param in the underlying input stream. + * @param length the length of the stream. + */ + public LengthInputStream(InputStream in, long length) { + super(in); + this.length = length; + } + + /** @return the length. */ + public long getLength() { + return length; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java new file mode 100644 index 00000000000..0f9588f3512 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.Closeable; +import java.io.InputStream; + +import org.apache.hadoop.io.IOUtils; + +/** + * Contains the input streams for the data and checksum of a replica. + */ +public class ReplicaInputStreams implements Closeable { + private final InputStream dataIn; + private final InputStream checksumIn; + + /** Create an object with a data input stream and a checksum input stream. */ + public ReplicaInputStreams(InputStream dataIn, InputStream checksumIn) { + this.dataIn = dataIn; + this.checksumIn = checksumIn; + } + + /** @return the data input stream. */ + public InputStream getDataIn() { + return dataIn; + } + + /** @return the checksum input stream. */ + public InputStream getChecksumIn() { + return checksumIn; + } + + @Override + public void close() { + IOUtils.closeStream(dataIn); + IOUtils.closeStream(checksumIn); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java new file mode 100644 index 00000000000..3866392d934 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.Closeable; +import java.io.OutputStream; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DataChecksum; + +/** + * Contains the output streams for the data and checksum of a replica. + */ +public class ReplicaOutputStreams implements Closeable { + private final OutputStream dataOut; + private final OutputStream checksumOut; + private final DataChecksum checksum; + + /** + * Create an object with a data output stream, a checksum output stream + * and a checksum. + */ + public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, + DataChecksum checksum) { + this.dataOut = dataOut; + this.checksumOut = checksumOut; + this.checksum = checksum; + } + + /** @return the data output stream. */ + public OutputStream getDataOut() { + return dataOut; + } + + /** @return the checksum output stream. */ + public OutputStream getChecksumOut() { + return checksumOut; + } + + /** @return the checksum. */ + public DataChecksum getChecksum() { + return checksum; + } + + @Override + public void close() { + IOUtils.closeStream(dataOut); + IOUtils.closeStream(checksumOut); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index acddcb4c73a..a37aefdefe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -39,6 +39,10 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -61,8 +65,7 @@ * * Note the synchronization is coarse grained - it is at each method. */ -public class SimulatedFSDataset - implements FSDatasetInterface { +public class SimulatedFSDataset implements FSDatasetInterface { static class Factory extends FSDatasetInterface.Factory { @Override public SimulatedFSDataset createFSDatasetInterface(DataNode datanode, @@ -215,14 +218,14 @@ synchronized boolean isFinalized() { } @Override - synchronized public BlockWriteStreams createStreams(boolean isCreate, + synchronized public ReplicaOutputStreams createStreams(boolean isCreate, DataChecksum requestedChecksum) throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); - return new BlockWriteStreams(oStream, crcStream, requestedChecksum); + return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum); } } @@ -688,13 +691,13 @@ public synchronized InputStream getBlockInputStream(ExtendedBlock b, /** Not supported */ @Override // FSDatasetInterface - public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException { throw new IOException("Not supported"); } @Override // FSDatasetInterface - public synchronized MetaDataInputStream getMetaDataInputStream(ExtendedBlock b + public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b ) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -706,7 +709,7 @@ public synchronized MetaDataInputStream getMetaDataInputStream(ExtendedBlock b " is being written, its meta cannot be read"); } final SimulatedInputStream sin = binfo.getMetaIStream(); - return new MetaDataInputStream(sin, sin.getLength()); + return new LengthInputStream(sin, sin.getLength()); } @Override @@ -716,7 +719,7 @@ public void checkDataDir() throws DiskErrorException { @Override // FSDatasetInterface public synchronized void adjustCrcChannelPosition(ExtendedBlock b, - BlockWriteStreams stream, + ReplicaOutputStreams stream, int checksumSize) throws IOException { } @@ -959,12 +962,12 @@ public String[] getBlockPoolList() { @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FSVolumeInterface vol) { + File diskMetaFile, FsVolumeSpi vol) { throw new UnsupportedOperationException(); } @Override - public List getVolumes() { + public List getVolumes() { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 59a61cf2ea9..e197bb38b35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -535,11 +535,11 @@ public void testNotMatchedReplicaID() throws IOException { LOG.debug("Running " + GenericTestUtils.getMethodName()); } ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block); - BlockWriteStreams streams = null; + ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512)); - streams.checksumOut.write('a'); + streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); try { dn.syncBlock(rBlock, initBlockRecords(dn)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java index 9737a251d32..615732d8d4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.junit.Test; @@ -98,7 +98,7 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) out.write(writeBuf); out.hflush(); DataNode dn = cluster.getDataNodes().get(0); - for (FSVolumeInterface v : dn.data.getVolumes()) { + for (FsVolumeSpi v : dn.data.getVolumes()) { FSVolume volume = (FSVolume)v; File currentDir = volume.getCurrentDir().getParentFile().getParentFile(); File rbwDir = new File(currentDir, "rbw"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java index f401be3af15..b1de62352d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java @@ -21,7 +21,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Assert; @@ -33,19 +33,19 @@ public class TestRoundRobinVolumesPolicy { // Test the Round-Robin block-volume choosing algorithm. @Test public void testRR() throws Exception { - final List volumes = new ArrayList(); + final List volumes = new ArrayList(); // First volume, with 100 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L); // Second volume, with 200 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); @SuppressWarnings("unchecked") - final RoundRobinVolumesPolicy policy = - (RoundRobinVolumesPolicy)ReflectionUtils.newInstance( + final RoundRobinVolumesPolicy policy = + (RoundRobinVolumesPolicy)ReflectionUtils.newInstance( RoundRobinVolumesPolicy.class, null); // Test two rounds of round-robin choosing @@ -71,18 +71,18 @@ public void testRR() throws Exception { @Test public void testRRPolicyExceptionMessage() throws Exception { - final List volumes = new ArrayList(); + final List volumes = new ArrayList(); // First volume, with 500 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L); // Second volume, with 600 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); - final RoundRobinVolumesPolicy policy - = new RoundRobinVolumesPolicy(); + final RoundRobinVolumesPolicy policy + = new RoundRobinVolumesPolicy(); int blockSize = 700; try { policy.chooseVolume(volumes, blockSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 40d55e644ae..d7c254d0bbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.util.DataChecksum; /** @@ -63,10 +63,10 @@ int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId) // we pass expected len as zero, - fsdataset should use the sizeof actual // data written ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b); - BlockWriteStreams out = bInfo.createStreams(true, + ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512)); try { - OutputStream dataOut = out.dataOut; + OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); for (int j=1; j <= blockIdToLen(i); ++j) { dataOut.write(j);