svn merge -c 1301661 from trunk for HDFS-3088.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1301663 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-03-16 17:36:57 +00:00
parent fe0b59edfa
commit ca252c931d
27 changed files with 317 additions and 193 deletions

View File

@ -141,6 +141,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3057. httpfs and hdfs launcher scripts should honor CATALINA_HOME
and HADOOP_LIBEXEC_DIR (rvs via tucu)
HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo)
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)

View File

@ -44,7 +44,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@ -72,7 +72,7 @@ class BlockPoolSliceScanner {
private final AtomicLong lastScanTime = new AtomicLong();
private final DataNode datanode;
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
private final FSDatasetInterface<? extends FsVolumeSpi> dataset;
private final SortedSet<BlockScanInfo> blockInfoSet
= new TreeSet<BlockScanInfo>();
@ -134,7 +134,7 @@ public int compareTo(BlockScanInfo other) {
}
BlockPoolSliceScanner(String bpid, DataNode datanode,
FSDatasetInterface<? extends FSVolumeInterface> dataset,
FSDatasetInterface<? extends FsVolumeSpi> dataset,
Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;

View File

@ -38,12 +38,12 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@ -86,7 +86,7 @@ class BlockReceiver implements Closeable {
private DataOutputStream mirrorOut;
private Daemon responder = null;
private DataTransferThrottler throttler;
private FSDataset.BlockWriteStreams streams;
private ReplicaOutputStreams streams;
private DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode;
@ -202,16 +202,16 @@ class BlockReceiver implements Closeable {
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
this.checksumSize = diskChecksum.getChecksumSize();
this.out = streams.dataOut;
this.out = streams.getDataOut();
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.cout = streams.checksumOut;
this.cout = streams.getChecksumOut();
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
cout, HdfsConstants.SMALL_BUFFER_SIZE));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
@ -856,13 +856,13 @@ private void computePartialChunkCrc(long blkoff, long ckoff,
//
byte[] buf = new byte[sizePartialChunk];
byte[] crcbuf = new byte[checksumSize];
FSDataset.BlockInputStreams instr = null;
ReplicaInputStreams instr = null;
try {
instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
} finally {
IOUtils.closeStream(instr);
}

View File

@ -21,7 +21,7 @@
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/**************************************************
* BlockVolumeChoosingPolicy allows a DataNode to
@ -33,7 +33,7 @@
*
***************************************************/
@InterfaceAudience.Private
public interface BlockVolumeChoosingPolicy<V extends FSVolumeInterface> {
public interface BlockVolumeChoosingPolicy<V extends FsVolumeSpi> {
/**
* Returns a specific FSVolume after applying a suitable choice algorithm

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/**
* DataBlockScanner manages block scanning for all the block pools. For each
@ -43,7 +43,7 @@
public class DataBlockScanner implements Runnable {
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
private final DataNode datanode;
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
private final FSDatasetInterface<? extends FsVolumeSpi> dataset;
private final Configuration conf;
/**
@ -55,7 +55,7 @@ public class DataBlockScanner implements Runnable {
Thread blockScannerThread = null;
DataBlockScanner(DataNode datanode,
FSDatasetInterface<? extends FSVolumeInterface> dataset,
FSDatasetInterface<? extends FsVolumeSpi> dataset,
Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;

View File

@ -123,8 +123,8 @@
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
@ -235,7 +235,7 @@ public static InetSocketAddress createSocketAddr(String target
volatile boolean shouldRun = true;
private BlockPoolManager blockPoolManager;
volatile FSDatasetInterface<? extends FSVolumeInterface> data = null;
volatile FSDatasetInterface<? extends FsVolumeSpi> data = null;
private String clusterId = null;
public final static String EMPTY_DEL_HINT = "";

View File

@ -55,7 +55,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
@ -511,7 +511,7 @@ public void blockChecksum(final ExtendedBlock block,
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
final MetaDataInputStream metadataIn =
final LengthInputStream metadataIn =
datanode.data.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));

View File

@ -43,7 +43,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.Daemon;
/**
@ -157,13 +157,13 @@ static class ScanInfo implements Comparable<ScanInfo> {
private final long blockId;
private final File metaFile;
private final File blockFile;
private final FSVolumeInterface volume;
private final FsVolumeSpi volume;
ScanInfo(long blockId) {
this(blockId, null, null, null);
}
ScanInfo(long blockId, File blockFile, File metaFile, FSVolumeInterface vol) {
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
this.blockId = blockId;
this.metaFile = metaFile;
this.blockFile = blockFile;
@ -182,7 +182,7 @@ long getBlockId() {
return blockId;
}
FSVolumeInterface getVolume() {
FsVolumeSpi getVolume() {
return volume;
}
@ -412,8 +412,8 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
/** Is the given volume still valid in the dataset? */
private static boolean isValid(final FSDatasetInterface<?> dataset,
final FSVolumeInterface volume) {
for (FSVolumeInterface vol : dataset.getVolumes()) {
final FsVolumeSpi volume) {
for (FsVolumeSpi vol : dataset.getVolumes()) {
if (vol == volume) {
return true;
}
@ -424,7 +424,7 @@ private static boolean isValid(final FSDatasetInterface<?> dataset,
/** Get lists of blocks on the disk sorted by blockId, per blockpool */
private Map<String, ScanInfo[]> getDiskReport() {
// First get list of data directories
final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
final List<? extends FsVolumeSpi> volumes = dataset.getVolumes();
ArrayList<ScanInfoPerBlockPool> dirReports =
new ArrayList<ScanInfoPerBlockPool>(volumes.size());
@ -473,9 +473,9 @@ private static boolean isBlockMetaFile(String blockId, String metaFile) {
private static class ReportCompiler
implements Callable<ScanInfoPerBlockPool> {
private FSVolumeInterface volume;
private FsVolumeSpi volume;
public ReportCompiler(FSVolumeInterface volume) {
public ReportCompiler(FsVolumeSpi volume) {
this.volume = volume;
}
@ -492,7 +492,7 @@ public ScanInfoPerBlockPool call() throws Exception {
}
/** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
private LinkedList<ScanInfo> compileReport(FSVolumeInterface vol, File dir,
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol, File dir,
LinkedList<ScanInfo> report) {
File[] files;
try {

View File

@ -61,7 +61,10 @@
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -548,7 +551,7 @@ public void shutdown() {
*
* It uses the {@link FSDataset} object for synchronization.
*/
static class FSVolume implements FSVolumeInterface {
static class FSVolume implements FsVolumeSpi {
private final FSDataset dataset;
private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current
@ -865,7 +868,7 @@ private long getCapacity() throws IOException {
private long getRemaining() throws IOException {
long remaining = 0L;
for (FSVolumeInterface vol : volumes) {
for (FsVolumeSpi vol : volumes) {
remaining += vol.getAvailable();
}
return remaining;
@ -1052,13 +1055,13 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
}
@Override // FSDatasetInterface
public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
throws IOException {
final File meta = getMetaFile(b);
if (meta == null || !meta.exists()) {
return null;
}
return new MetaDataInputStream(new FileInputStream(meta), meta.length());
return new LengthInputStream(new FileInputStream(meta), meta.length());
}
private final DataNode datanode;
@ -1287,7 +1290,7 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid)
* Returns handles to the block file and its metadata file
*/
@Override // FSDatasetInterface
public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b,
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long ckoff) throws IOException {
ReplicaInfo info = getReplicaInfo(b);
File blockFile = info.getBlockFile();
@ -1300,7 +1303,7 @@ public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b,
if (ckoff > 0) {
metaInFile.seek(ckoff);
}
return new BlockInputStreams(new FileInputStream(blockInFile.getFD()),
return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()),
new FileInputStream(metaInFile.getFD()));
}
@ -1742,9 +1745,9 @@ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
* last checksum will be overwritten.
*/
@Override // FSDatasetInterface
public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams,
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
int checksumSize) throws IOException {
FileOutputStream file = (FileOutputStream) streams.checksumOut;
FileOutputStream file = (FileOutputStream) streams.getChecksumOut();
FileChannel channel = file.getChannel();
long oldPos = channel.position();
long newPos = oldPos - checksumSize;
@ -2195,7 +2198,7 @@ public String getStorageInfo() {
*/
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FSVolumeInterface vol) {
File diskMetaFile, FsVolumeSpi vol) {
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
synchronized (this) {

View File

@ -18,12 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.Closeable;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
@ -34,11 +31,13 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ReflectionUtils;
@ -50,7 +49,7 @@
*
*/
@InterfaceAudience.Private
public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterface>
public interface FSDatasetInterface<V extends FsVolumeSpi>
extends FSDatasetMBean {
/**
* A factory for creating FSDatasetInterface objects.
@ -77,24 +76,6 @@ public boolean isSimulated() {
}
}
/**
* This is an interface for the underlying volume.
* @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume
*/
interface FSVolumeInterface {
/** @return a list of block pools. */
public String[] getBlockPoolList();
/** @return the available storage space in bytes. */
public long getAvailable() throws IOException;
/** @return the path to the volume */
public String getPath(String bpid) throws IOException;
/** @return the directory for the finalized blocks in the block pool. */
public File getFinalizedDir(String bpid) throws IOException;
}
/**
* Create rolling logs.
*
@ -121,24 +102,7 @@ interface FSVolumeInterface {
* as corrupted.
*/
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FSVolumeInterface vol);
/**
* This class provides the input stream and length of the metadata
* of a block
*
*/
static class MetaDataInputStream extends FilterInputStream {
MetaDataInputStream(InputStream stream, long len) {
super(stream);
length = len;
}
private long length;
public long getLength() {
return length;
}
}
File diskMetaFile, FsVolumeSpi vol);
/**
* @param b - the block
@ -146,7 +110,7 @@ public long getLength() {
* otherwise, return null.
* @throws IOException
*/
public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b
public LengthInputStream getMetaDataInputStream(ExtendedBlock b
) throws IOException;
/**
@ -197,57 +161,9 @@ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset)
* starting at the offset
* @throws IOException
*/
public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
long ckoff) throws IOException;
/**
*
* This class contains the output streams for the data and checksum
* of a block
*
*/
static class BlockWriteStreams {
OutputStream dataOut;
OutputStream checksumOut;
DataChecksum checksum;
BlockWriteStreams(OutputStream dOut, OutputStream cOut,
DataChecksum checksum) {
dataOut = dOut;
checksumOut = cOut;
this.checksum = checksum;
}
void close() {
IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut);
}
DataChecksum getChecksum() {
return checksum;
}
}
/**
* This class contains the input streams for the data and checksum
* of a block
*/
static class BlockInputStreams implements Closeable {
final InputStream dataIn;
final InputStream checksumIn;
BlockInputStreams(InputStream dataIn, InputStream checksumIn) {
this.dataIn = dataIn;
this.checksumIn = checksumIn;
}
@Override
public void close() {
IOUtils.closeStream(dataIn);
IOUtils.closeStream(checksumIn);
}
}
/**
* Creates a temporary replica and returns the meta information of the replica
*
@ -395,7 +311,7 @@ public void recoverClose(ExtendedBlock b,
* @param checksumSize number of bytes each checksum has
* @throws IOException
*/
public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream,
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream,
int checksumSize) throws IOException;
/**

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/**
* This class describes a replica that has been finalized.
@ -38,7 +38,7 @@ class FinalizedReplica extends ReplicaInfo {
* @param dir directory path where block and meta files are located
*/
FinalizedReplica(long blockId, long len, long genStamp,
FSVolumeInterface vol, File dir) {
FsVolumeSpi vol, File dir) {
super(blockId, len, genStamp, vol, dir);
}
@ -48,7 +48,7 @@ class FinalizedReplica extends ReplicaInfo {
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
FinalizedReplica(Block block, FSVolumeInterface vol, File dir) {
FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
super(block, vol, dir);
}

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/** This class represents replicas being written.
* Those are the replicas that
@ -36,7 +36,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
* @param dir directory path where block and meta files are located
*/
ReplicaBeingWritten(long blockId, long genStamp,
FSVolumeInterface vol, File dir) {
FsVolumeSpi vol, File dir) {
super( blockId, genStamp, vol, dir);
}
@ -48,7 +48,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
* @param writer a thread that is writing to this replica
*/
ReplicaBeingWritten(Block block,
FSVolumeInterface vol, File dir, Thread writer) {
FsVolumeSpi vol, File dir, Thread writer) {
super( block, vol, dir, writer);
}
@ -62,7 +62,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
* @param writer a thread that is writing to this replica
*/
ReplicaBeingWritten(long blockId, long len, long genStamp,
FSVolumeInterface vol, File dir, Thread writer ) {
FsVolumeSpi vol, File dir, Thread writer ) {
super( blockId, len, genStamp, vol, dir, writer);
}

View File

@ -24,8 +24,8 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@ -53,7 +53,7 @@ class ReplicaInPipeline extends ReplicaInfo
* @param state replica state
*/
ReplicaInPipeline(long blockId, long genStamp,
FSVolumeInterface vol, File dir) {
FsVolumeSpi vol, File dir) {
this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
}
@ -65,7 +65,7 @@ class ReplicaInPipeline extends ReplicaInfo
* @param writer a thread that is writing to this replica
*/
ReplicaInPipeline(Block block,
FSVolumeInterface vol, File dir, Thread writer) {
FsVolumeSpi vol, File dir, Thread writer) {
this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
vol, dir, writer);
}
@ -80,7 +80,7 @@ class ReplicaInPipeline extends ReplicaInfo
* @param writer a thread that is writing to this replica
*/
ReplicaInPipeline(long blockId, long len, long genStamp,
FSVolumeInterface vol, File dir, Thread writer ) {
FsVolumeSpi vol, File dir, Thread writer ) {
super( blockId, len, genStamp, vol, dir);
this.bytesAcked = len;
this.bytesOnDisk = len;
@ -168,7 +168,7 @@ public int hashCode() {
}
@Override // ReplicaInPipelineInterface
public BlockWriteStreams createStreams(boolean isCreate,
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) throws IOException {
File blockFile = getBlockFile();
File metaFile = getMetaFile();
@ -234,7 +234,7 @@ public BlockWriteStreams createStreams(boolean isCreate,
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
return new BlockWriteStreams(blockOut, crcOut, checksum);
return new ReplicaOutputStreams(blockOut, crcOut, checksum);
} catch (IOException e) {
IOUtils.closeStream(blockOut);
IOUtils.closeStream(metaRAF);

View File

@ -19,7 +19,7 @@
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.util.DataChecksum;
/**
@ -66,6 +66,6 @@ interface ReplicaInPipelineInterface extends Replica {
* @return output streams for writing
* @throws IOException if any error occurs
*/
public BlockWriteStreams createStreams(boolean isCreate,
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) throws IOException;
}

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
/**
@ -36,7 +36,7 @@
@InterfaceAudience.Private
abstract public class ReplicaInfo extends Block implements Replica {
/** volume where the replica belongs */
private FSVolumeInterface volume;
private FsVolumeSpi volume;
/** directory where block & meta files belong */
private File dir;
@ -47,7 +47,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
ReplicaInfo(long blockId, long genStamp, FSVolumeInterface vol, File dir) {
ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
this( blockId, 0L, genStamp, vol, dir);
}
@ -57,7 +57,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
ReplicaInfo(Block block, FSVolumeInterface vol, File dir) {
ReplicaInfo(Block block, FsVolumeSpi vol, File dir) {
this(block.getBlockId(), block.getNumBytes(),
block.getGenerationStamp(), vol, dir);
}
@ -71,7 +71,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
* @param dir directory path where block and meta files are located
*/
ReplicaInfo(long blockId, long len, long genStamp,
FSVolumeInterface vol, File dir) {
FsVolumeSpi vol, File dir) {
super(blockId, len, genStamp);
this.volume = vol;
this.dir = dir;
@ -113,14 +113,14 @@ File getMetaFile() {
* Get the volume where this replica is located on disk
* @return the volume where this replica is located on disk
*/
FSVolumeInterface getVolume() {
FsVolumeSpi getVolume() {
return volume;
}
/**
* Set the volume where this replica is located on disk
*/
void setVolume(FSVolumeInterface vol) {
void setVolume(FsVolumeSpi vol) {
this.volume = vol;
}

View File

@ -20,7 +20,7 @@
import java.io.File;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
/**
@ -145,7 +145,7 @@ void setDir(File dir) {
}
@Override //ReplicaInfo
void setVolume(FSVolumeInterface vol) {
void setVolume(FsVolumeSpi vol) {
super.setVolume(vol);
original.setVolume(vol);
}

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/**
* This class represents a replica that is waiting to be recovered.
@ -44,7 +44,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
* @param dir directory path where block and meta files are located
*/
ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
FSVolumeInterface vol, File dir) {
FsVolumeSpi vol, File dir) {
super(blockId, len, genStamp, vol, dir);
}
@ -54,7 +54,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
ReplicaWaitingToBeRecovered(Block block, FSVolumeInterface vol, File dir) {
ReplicaWaitingToBeRecovered(Block block, FsVolumeSpi vol, File dir) {
super(block, vol, dir);
}

View File

@ -20,10 +20,10 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
public class RoundRobinVolumesPolicy<V extends FSVolumeInterface>
public class RoundRobinVolumesPolicy<V extends FsVolumeSpi>
implements BlockVolumeChoosingPolicy<V> {
private int curVolume = 0;

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.File;
import java.io.IOException;
/**
* This is an interface for the underlying volume.
*/
public interface FsVolumeSpi {
/** @return a list of block pools. */
public String[] getBlockPoolList();
/** @return the available storage space in bytes. */
public long getAvailable() throws IOException;
/** @return the path to the volume */
public String getPath(String bpid) throws IOException;
/** @return the directory for the finalized blocks in the block pool. */
public File getFinalizedDir(String bpid) throws IOException;
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.FilterInputStream;
import java.io.InputStream;
/**
* An input stream with length.
*/
public class LengthInputStream extends FilterInputStream {
private final long length;
/**
* Create an stream.
* @param in the underlying input stream.
* @param length the length of the stream.
*/
public LengthInputStream(InputStream in, long length) {
super(in);
this.length = length;
}
/** @return the length. */
public long getLength() {
return length;
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.InputStream;
import org.apache.hadoop.io.IOUtils;
/**
* Contains the input streams for the data and checksum of a replica.
*/
public class ReplicaInputStreams implements Closeable {
private final InputStream dataIn;
private final InputStream checksumIn;
/** Create an object with a data input stream and a checksum input stream. */
public ReplicaInputStreams(InputStream dataIn, InputStream checksumIn) {
this.dataIn = dataIn;
this.checksumIn = checksumIn;
}
/** @return the data input stream. */
public InputStream getDataIn() {
return dataIn;
}
/** @return the checksum input stream. */
public InputStream getChecksumIn() {
return checksumIn;
}
@Override
public void close() {
IOUtils.closeStream(dataIn);
IOUtils.closeStream(checksumIn);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.OutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
/**
* Contains the output streams for the data and checksum of a replica.
*/
public class ReplicaOutputStreams implements Closeable {
private final OutputStream dataOut;
private final OutputStream checksumOut;
private final DataChecksum checksum;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
DataChecksum checksum) {
this.dataOut = dataOut;
this.checksumOut = checksumOut;
this.checksum = checksum;
}
/** @return the data output stream. */
public OutputStream getDataOut() {
return dataOut;
}
/** @return the checksum output stream. */
public OutputStream getChecksumOut() {
return checksumOut;
}
/** @return the checksum. */
public DataChecksum getChecksum() {
return checksum;
}
@Override
public void close() {
IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut);
}
}

View File

@ -39,6 +39,10 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -61,8 +65,7 @@
*
* Note the synchronization is coarse grained - it is at each method.
*/
public class SimulatedFSDataset
implements FSDatasetInterface<FSDatasetInterface.FSVolumeInterface> {
public class SimulatedFSDataset implements FSDatasetInterface<FsVolumeSpi> {
static class Factory extends FSDatasetInterface.Factory<SimulatedFSDataset> {
@Override
public SimulatedFSDataset createFSDatasetInterface(DataNode datanode,
@ -215,14 +218,14 @@ synchronized boolean isFinalized() {
}
@Override
synchronized public BlockWriteStreams createStreams(boolean isCreate,
synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
+ theBlock);
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new BlockWriteStreams(oStream, crcStream, requestedChecksum);
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum);
}
}
@ -688,13 +691,13 @@ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
/** Not supported */
@Override // FSDatasetInterface
public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
long ckoff) throws IOException {
throw new IOException("Not supported");
}
@Override // FSDatasetInterface
public synchronized MetaDataInputStream getMetaDataInputStream(ExtendedBlock b
public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
@ -706,7 +709,7 @@ public synchronized MetaDataInputStream getMetaDataInputStream(ExtendedBlock b
" is being written, its meta cannot be read");
}
final SimulatedInputStream sin = binfo.getMetaIStream();
return new MetaDataInputStream(sin, sin.getLength());
return new LengthInputStream(sin, sin.getLength());
}
@Override
@ -716,7 +719,7 @@ public void checkDataDir() throws DiskErrorException {
@Override // FSDatasetInterface
public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
BlockWriteStreams stream,
ReplicaOutputStreams stream,
int checksumSize)
throws IOException {
}
@ -959,12 +962,12 @@ public String[] getBlockPoolList() {
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FSVolumeInterface vol) {
File diskMetaFile, FsVolumeSpi vol) {
throw new UnsupportedOperationException();
}
@Override
public List<FSVolumeInterface> getVolumes() {
public List<FsVolumeSpi> getVolumes() {
throw new UnsupportedOperationException();
}

View File

@ -36,7 +36,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -535,11 +535,11 @@ public void testNotMatchedReplicaID() throws IOException {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
BlockWriteStreams streams = null;
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
streams.checksumOut.write('a');
streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
try {
dn.syncBlock(rBlock, initBlockRecords(dn));

View File

@ -37,7 +37,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
@ -98,7 +98,7 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
out.write(writeBuf);
out.hflush();
DataNode dn = cluster.getDataNodes().get(0);
for (FSVolumeInterface v : dn.data.getVolumes()) {
for (FsVolumeSpi v : dn.data.getVolumes()) {
FSVolume volume = (FSVolume)v;
File currentDir = volume.getCurrentDir().getParentFile().getParentFile();
File rbwDir = new File(currentDir, "rbw");

View File

@ -21,7 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Assert;
@ -33,19 +33,19 @@ public class TestRoundRobinVolumesPolicy {
// Test the Round-Robin block-volume choosing algorithm.
@Test
public void testRR() throws Exception {
final List<FSVolumeInterface> volumes = new ArrayList<FSVolumeInterface>();
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
// First volume, with 100 bytes of space.
volumes.add(Mockito.mock(FSVolumeInterface.class));
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
// Second volume, with 200 bytes of space.
volumes.add(Mockito.mock(FSVolumeInterface.class));
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
@SuppressWarnings("unchecked")
final RoundRobinVolumesPolicy<FSVolumeInterface> policy =
(RoundRobinVolumesPolicy<FSVolumeInterface>)ReflectionUtils.newInstance(
final RoundRobinVolumesPolicy<FsVolumeSpi> policy =
(RoundRobinVolumesPolicy<FsVolumeSpi>)ReflectionUtils.newInstance(
RoundRobinVolumesPolicy.class, null);
// Test two rounds of round-robin choosing
@ -71,18 +71,18 @@ public void testRR() throws Exception {
@Test
public void testRRPolicyExceptionMessage()
throws Exception {
final List<FSVolumeInterface> volumes = new ArrayList<FSVolumeInterface>();
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
// First volume, with 500 bytes of space.
volumes.add(Mockito.mock(FSVolumeInterface.class));
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L);
// Second volume, with 600 bytes of space.
volumes.add(Mockito.mock(FSVolumeInterface.class));
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
final RoundRobinVolumesPolicy<FSVolumeInterface> policy
= new RoundRobinVolumesPolicy<FSVolumeInterface>();
final RoundRobinVolumesPolicy<FsVolumeSpi> policy
= new RoundRobinVolumesPolicy<FsVolumeSpi>();
int blockSize = 700;
try {
policy.chooseVolume(volumes, blockSize);

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.util.DataChecksum;
/**
@ -63,10 +63,10 @@ int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId)
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
BlockWriteStreams out = bInfo.createStreams(true,
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
try {
OutputStream dataOut = out.dataOut;
OutputStream dataOut = out.getDataOut();
assertEquals(0, fsdataset.getLength(b));
for (int j=1; j <= blockIdToLen(i); ++j) {
dataOut.write(j);