From ca252c931d70c7e304ccd5a62ec33aa2e0217cc4 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Fri, 16 Mar 2012 17:36:57 +0000 Subject: [PATCH] svn merge -c 1301661 from trunk for HDFS-3088. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1301663 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../datanode/BlockPoolSliceScanner.java | 6 +- .../hdfs/server/datanode/BlockReceiver.java | 18 ++-- .../datanode/BlockVolumeChoosingPolicy.java | 4 +- .../server/datanode/DataBlockScanner.java | 6 +- .../hadoop/hdfs/server/datanode/DataNode.java | 4 +- .../hdfs/server/datanode/DataXceiver.java | 4 +- .../server/datanode/DirectoryScanner.java | 20 ++-- .../hdfs/server/datanode/FSDataset.java | 23 ++-- .../server/datanode/FSDatasetInterface.java | 102 ++---------------- .../server/datanode/FinalizedReplica.java | 6 +- .../server/datanode/ReplicaBeingWritten.java | 8 +- .../server/datanode/ReplicaInPipeline.java | 14 +-- .../datanode/ReplicaInPipelineInterface.java | 4 +- .../hdfs/server/datanode/ReplicaInfo.java | 14 +-- .../server/datanode/ReplicaUnderRecovery.java | 4 +- .../datanode/ReplicaWaitingToBeRecovered.java | 6 +- .../datanode/RoundRobinVolumesPolicy.java | 4 +- .../datanode/fsdataset/FsVolumeSpi.java | 38 +++++++ .../datanode/fsdataset/LengthInputStream.java | 44 ++++++++ .../fsdataset/ReplicaInputStreams.java | 53 +++++++++ .../fsdataset/ReplicaOutputStreams.java | 65 +++++++++++ .../server/datanode/SimulatedFSDataset.java | 23 ++-- .../server/datanode/TestBlockRecovery.java | 6 +- .../server/datanode/TestDatanodeRestart.java | 4 +- .../datanode/TestRoundRobinVolumesPolicy.java | 22 ++-- .../datanode/TestSimulatedFSDataset.java | 6 +- 27 files changed, 317 insertions(+), 193 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b671362e218..d3e5ed200ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -141,6 +141,8 @@ Release 0.23.3 - UNRELEASED HDFS-3057. httpfs and hdfs launcher scripts should honor CATALINA_HOME and HADOOP_LIBEXEC_DIR (rvs via tucu) + HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index d4cd2fe829f..fdc78c9b9a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -72,7 +72,7 @@ class BlockPoolSliceScanner { private final AtomicLong lastScanTime = new AtomicLong(); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FSDatasetInterface dataset; private final SortedSet blockInfoSet = new TreeSet(); @@ -134,7 +134,7 @@ class BlockPoolSliceScanner { } BlockPoolSliceScanner(String bpid, DataNode datanode, - FSDatasetInterface dataset, + FSDatasetInterface dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index fd25c1df376..1449b88f8fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -38,12 +38,12 @@ import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -86,7 +86,7 @@ class BlockReceiver implements Closeable { private DataOutputStream mirrorOut; private Daemon responder = null; private DataTransferThrottler throttler; - private FSDataset.BlockWriteStreams streams; + private ReplicaOutputStreams streams; private DatanodeInfo srcDataNode = null; private Checksum partialCrc = null; private final DataNode datanode; @@ -202,16 +202,16 @@ class BlockReceiver implements Closeable { this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.checksumSize = diskChecksum.getChecksumSize(); - this.out = streams.dataOut; + this.out = streams.getDataOut(); if (out instanceof FileOutputStream) { this.outFd = ((FileOutputStream)out).getFD(); } else { LOG.warn("Could not get file descriptor for outputstream of class " + out.getClass()); } - this.cout = streams.checksumOut; + this.cout = streams.getChecksumOut(); this.checksumOut = new DataOutputStream(new BufferedOutputStream( - streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE)); + cout, HdfsConstants.SMALL_BUFFER_SIZE)); // write data chunk header if creating a new replica if (isCreate) { BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); @@ -856,13 +856,13 @@ class BlockReceiver implements Closeable { // byte[] buf = new byte[sizePartialChunk]; byte[] crcbuf = new byte[checksumSize]; - FSDataset.BlockInputStreams instr = null; + ReplicaInputStreams instr = null; try { instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff); - IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk); + IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier - IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length); + IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); } finally { IOUtils.closeStream(instr); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java index c96be75f125..28c1dc46d70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /************************************************** * BlockVolumeChoosingPolicy allows a DataNode to @@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterfa * ***************************************************/ @InterfaceAudience.Private -public interface BlockVolumeChoosingPolicy { +public interface BlockVolumeChoosingPolicy { /** * Returns a specific FSVolume after applying a suitable choice algorithm diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java index d52681509a3..63b2464e728 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @@ -31,7 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; /** * DataBlockScanner manages block scanning for all the block pools. For each @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterfa public class DataBlockScanner implements Runnable { public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FSDatasetInterface dataset; private final Configuration conf; /** @@ -55,7 +55,7 @@ public class DataBlockScanner implements Runnable { Thread blockScannerThread = null; DataBlockScanner(DataNode datanode, - FSDatasetInterface dataset, + FSDatasetInterface dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 59b2882b588..651d2bbd093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -123,8 +123,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets; @@ -235,7 +235,7 @@ public class DataNode extends Configured volatile boolean shouldRun = true; private BlockPoolManager blockPoolManager; - volatile FSDatasetInterface data = null; + volatile FSDatasetInterface data = null; private String clusterId = null; public final static String EMPTY_DEL_HINT = ""; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index fe07754a3c7..da9bc79048f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -55,7 +55,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; @@ -511,7 +511,7 @@ class DataXceiver extends Receiver implements Runnable { checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); updateCurrentThreadName("Reading metadata for block " + block); - final MetaDataInputStream metadataIn = + final LengthInputStream metadataIn = datanode.data.getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 97ff5a8416e..91ce4092d7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.Daemon; /** @@ -157,13 +157,13 @@ public class DirectoryScanner implements Runnable { private final long blockId; private final File metaFile; private final File blockFile; - private final FSVolumeInterface volume; + private final FsVolumeSpi volume; ScanInfo(long blockId) { this(blockId, null, null, null); } - ScanInfo(long blockId, File blockFile, File metaFile, FSVolumeInterface vol) { + ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) { this.blockId = blockId; this.metaFile = metaFile; this.blockFile = blockFile; @@ -182,7 +182,7 @@ public class DirectoryScanner implements Runnable { return blockId; } - FSVolumeInterface getVolume() { + FsVolumeSpi getVolume() { return volume; } @@ -412,8 +412,8 @@ public class DirectoryScanner implements Runnable { /** Is the given volume still valid in the dataset? */ private static boolean isValid(final FSDatasetInterface dataset, - final FSVolumeInterface volume) { - for (FSVolumeInterface vol : dataset.getVolumes()) { + final FsVolumeSpi volume) { + for (FsVolumeSpi vol : dataset.getVolumes()) { if (vol == volume) { return true; } @@ -424,7 +424,7 @@ public class DirectoryScanner implements Runnable { /** Get lists of blocks on the disk sorted by blockId, per blockpool */ private Map getDiskReport() { // First get list of data directories - final List volumes = dataset.getVolumes(); + final List volumes = dataset.getVolumes(); ArrayList dirReports = new ArrayList(volumes.size()); @@ -473,9 +473,9 @@ public class DirectoryScanner implements Runnable { private static class ReportCompiler implements Callable { - private FSVolumeInterface volume; + private FsVolumeSpi volume; - public ReportCompiler(FSVolumeInterface volume) { + public ReportCompiler(FsVolumeSpi volume) { this.volume = volume; } @@ -492,7 +492,7 @@ public class DirectoryScanner implements Runnable { } /** Compile list {@link ScanInfo} for the blocks in the directory */ - private LinkedList compileReport(FSVolumeInterface vol, File dir, + private LinkedList compileReport(FsVolumeSpi vol, File dir, LinkedList report) { File[] files; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index 4c7ea40d342..f970e01b5d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -61,7 +61,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -548,7 +551,7 @@ class FSDataset implements FSDatasetInterface { * * It uses the {@link FSDataset} object for synchronization. */ - static class FSVolume implements FSVolumeInterface { + static class FSVolume implements FsVolumeSpi { private final FSDataset dataset; private final Map map = new HashMap(); private final File currentDir; // /current @@ -865,7 +868,7 @@ class FSDataset implements FSDatasetInterface { private long getRemaining() throws IOException { long remaining = 0L; - for (FSVolumeInterface vol : volumes) { + for (FsVolumeSpi vol : volumes) { remaining += vol.getAvailable(); } return remaining; @@ -1052,13 +1055,13 @@ class FSDataset implements FSDatasetInterface { } @Override // FSDatasetInterface - public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b) + public LengthInputStream getMetaDataInputStream(ExtendedBlock b) throws IOException { final File meta = getMetaFile(b); if (meta == null || !meta.exists()) { return null; } - return new MetaDataInputStream(new FileInputStream(meta), meta.length()); + return new LengthInputStream(new FileInputStream(meta), meta.length()); } private final DataNode datanode; @@ -1287,7 +1290,7 @@ class FSDataset implements FSDatasetInterface { * Returns handles to the block file and its metadata file */ @Override // FSDatasetInterface - public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b, + public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); File blockFile = info.getBlockFile(); @@ -1300,7 +1303,7 @@ class FSDataset implements FSDatasetInterface { if (ckoff > 0) { metaInFile.seek(ckoff); } - return new BlockInputStreams(new FileInputStream(blockInFile.getFD()), + return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()), new FileInputStream(metaInFile.getFD())); } @@ -1742,9 +1745,9 @@ class FSDataset implements FSDatasetInterface { * last checksum will be overwritten. */ @Override // FSDatasetInterface - public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams, + public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, int checksumSize) throws IOException { - FileOutputStream file = (FileOutputStream) streams.checksumOut; + FileOutputStream file = (FileOutputStream) streams.getChecksumOut(); FileChannel channel = file.getChannel(); long oldPos = channel.position(); long newPos = oldPos - checksumSize; @@ -2195,7 +2198,7 @@ class FSDataset implements FSDatasetInterface { */ @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FSVolumeInterface vol) { + File diskMetaFile, FsVolumeSpi vol) { Block corruptBlock = null; ReplicaInfo memBlockInfo; synchronized (this) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java index 029c2707281..910a1af8ce7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java @@ -18,12 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode; -import java.io.Closeable; import java.io.File; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.List; import java.util.Map; @@ -34,11 +31,13 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.ReflectionUtils; @@ -50,7 +49,7 @@ import org.apache.hadoop.util.ReflectionUtils; * */ @InterfaceAudience.Private -public interface FSDatasetInterface +public interface FSDatasetInterface extends FSDatasetMBean { /** * A factory for creating FSDatasetInterface objects. @@ -77,24 +76,6 @@ public interface FSDatasetInterface +public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy { private int curVolume = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java new file mode 100644 index 00000000000..87b5ec55f4f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.File; +import java.io.IOException; + +/** + * This is an interface for the underlying volume. + */ +public interface FsVolumeSpi { + /** @return a list of block pools. */ + public String[] getBlockPoolList(); + + /** @return the available storage space in bytes. */ + public long getAvailable() throws IOException; + + /** @return the path to the volume */ + public String getPath(String bpid) throws IOException; + + /** @return the directory for the finalized blocks in the block pool. */ + public File getFinalizedDir(String bpid) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java new file mode 100644 index 00000000000..22ec6dc44a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.FilterInputStream; +import java.io.InputStream; + +/** + * An input stream with length. + */ +public class LengthInputStream extends FilterInputStream { + + private final long length; + + /** + * Create an stream. + * @param in the underlying input stream. + * @param length the length of the stream. + */ + public LengthInputStream(InputStream in, long length) { + super(in); + this.length = length; + } + + /** @return the length. */ + public long getLength() { + return length; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java new file mode 100644 index 00000000000..0f9588f3512 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.Closeable; +import java.io.InputStream; + +import org.apache.hadoop.io.IOUtils; + +/** + * Contains the input streams for the data and checksum of a replica. + */ +public class ReplicaInputStreams implements Closeable { + private final InputStream dataIn; + private final InputStream checksumIn; + + /** Create an object with a data input stream and a checksum input stream. */ + public ReplicaInputStreams(InputStream dataIn, InputStream checksumIn) { + this.dataIn = dataIn; + this.checksumIn = checksumIn; + } + + /** @return the data input stream. */ + public InputStream getDataIn() { + return dataIn; + } + + /** @return the checksum input stream. */ + public InputStream getChecksumIn() { + return checksumIn; + } + + @Override + public void close() { + IOUtils.closeStream(dataIn); + IOUtils.closeStream(checksumIn); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java new file mode 100644 index 00000000000..3866392d934 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.Closeable; +import java.io.OutputStream; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DataChecksum; + +/** + * Contains the output streams for the data and checksum of a replica. + */ +public class ReplicaOutputStreams implements Closeable { + private final OutputStream dataOut; + private final OutputStream checksumOut; + private final DataChecksum checksum; + + /** + * Create an object with a data output stream, a checksum output stream + * and a checksum. + */ + public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, + DataChecksum checksum) { + this.dataOut = dataOut; + this.checksumOut = checksumOut; + this.checksum = checksum; + } + + /** @return the data output stream. */ + public OutputStream getDataOut() { + return dataOut; + } + + /** @return the checksum output stream. */ + public OutputStream getChecksumOut() { + return checksumOut; + } + + /** @return the checksum. */ + public DataChecksum getChecksum() { + return checksum; + } + + @Override + public void close() { + IOUtils.closeStream(dataOut); + IOUtils.closeStream(checksumOut); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index acddcb4c73a..a37aefdefe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -39,6 +39,10 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -61,8 +65,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException; * * Note the synchronization is coarse grained - it is at each method. */ -public class SimulatedFSDataset - implements FSDatasetInterface { +public class SimulatedFSDataset implements FSDatasetInterface { static class Factory extends FSDatasetInterface.Factory { @Override public SimulatedFSDataset createFSDatasetInterface(DataNode datanode, @@ -215,14 +218,14 @@ public class SimulatedFSDataset } @Override - synchronized public BlockWriteStreams createStreams(boolean isCreate, + synchronized public ReplicaOutputStreams createStreams(boolean isCreate, DataChecksum requestedChecksum) throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); - return new BlockWriteStreams(oStream, crcStream, requestedChecksum); + return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum); } } @@ -688,13 +691,13 @@ public class SimulatedFSDataset /** Not supported */ @Override // FSDatasetInterface - public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException { throw new IOException("Not supported"); } @Override // FSDatasetInterface - public synchronized MetaDataInputStream getMetaDataInputStream(ExtendedBlock b + public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b ) throws IOException { final Map map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); @@ -706,7 +709,7 @@ public class SimulatedFSDataset " is being written, its meta cannot be read"); } final SimulatedInputStream sin = binfo.getMetaIStream(); - return new MetaDataInputStream(sin, sin.getLength()); + return new LengthInputStream(sin, sin.getLength()); } @Override @@ -716,7 +719,7 @@ public class SimulatedFSDataset @Override // FSDatasetInterface public synchronized void adjustCrcChannelPosition(ExtendedBlock b, - BlockWriteStreams stream, + ReplicaOutputStreams stream, int checksumSize) throws IOException { } @@ -959,12 +962,12 @@ public class SimulatedFSDataset @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FSVolumeInterface vol) { + File diskMetaFile, FsVolumeSpi vol) { throw new UnsupportedOperationException(); } @Override - public List getVolumes() { + public List getVolumes() { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 59a61cf2ea9..e197bb38b35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -535,11 +535,11 @@ public class TestBlockRecovery { LOG.debug("Running " + GenericTestUtils.getMethodName()); } ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block); - BlockWriteStreams streams = null; + ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512)); - streams.checksumOut.write('a'); + streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); try { dn.syncBlock(rBlock, initBlockRecords(dn)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java index 9737a251d32..615732d8d4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.junit.Test; @@ -98,7 +98,7 @@ public class TestDatanodeRestart { out.write(writeBuf); out.hflush(); DataNode dn = cluster.getDataNodes().get(0); - for (FSVolumeInterface v : dn.data.getVolumes()) { + for (FsVolumeSpi v : dn.data.getVolumes()) { FSVolume volume = (FSVolume)v; File currentDir = volume.getCurrentDir().getParentFile().getParentFile(); File rbwDir = new File(currentDir, "rbw"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java index f401be3af15..b1de62352d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Assert; @@ -33,19 +33,19 @@ public class TestRoundRobinVolumesPolicy { // Test the Round-Robin block-volume choosing algorithm. @Test public void testRR() throws Exception { - final List volumes = new ArrayList(); + final List volumes = new ArrayList(); // First volume, with 100 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L); // Second volume, with 200 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); @SuppressWarnings("unchecked") - final RoundRobinVolumesPolicy policy = - (RoundRobinVolumesPolicy)ReflectionUtils.newInstance( + final RoundRobinVolumesPolicy policy = + (RoundRobinVolumesPolicy)ReflectionUtils.newInstance( RoundRobinVolumesPolicy.class, null); // Test two rounds of round-robin choosing @@ -71,18 +71,18 @@ public class TestRoundRobinVolumesPolicy { @Test public void testRRPolicyExceptionMessage() throws Exception { - final List volumes = new ArrayList(); + final List volumes = new ArrayList(); // First volume, with 500 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L); // Second volume, with 600 bytes of space. - volumes.add(Mockito.mock(FSVolumeInterface.class)); + volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); - final RoundRobinVolumesPolicy policy - = new RoundRobinVolumesPolicy(); + final RoundRobinVolumesPolicy policy + = new RoundRobinVolumesPolicy(); int blockSize = 700; try { policy.chooseVolume(volumes, blockSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 40d55e644ae..d7c254d0bbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.util.DataChecksum; /** @@ -63,10 +63,10 @@ public class TestSimulatedFSDataset extends TestCase { // we pass expected len as zero, - fsdataset should use the sizeof actual // data written ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b); - BlockWriteStreams out = bInfo.createStreams(true, + ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512)); try { - OutputStream dataOut = out.dataOut; + OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); for (int j=1; j <= blockIdToLen(i); ++j) { dataOut.write(j);