svn merge -c 1305603 from trunk for HDFS-3089. Move FSDatasetInterface and the related classes to a package.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1305606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-03-26 21:19:51 +00:00
parent e6e96aa2f0
commit 6fc0b02fd4
21 changed files with 210 additions and 201 deletions

View File

@ -165,6 +165,9 @@ Release 0.23.3 - UNRELEASED
HDFS-3071. haadmin failover command does not provide enough detail when HDFS-3071. haadmin failover command does not provide enough detail when
target NN is not ready to be active. (todd) target NN is not ready to be active. (todd)
HDFS-3089. Move FSDatasetInterface and the related classes to a package.
(szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the HDFS-2477. Optimize computing the diff between a block report and the

View File

@ -237,9 +237,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy";
public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY_DEFAULT =
"org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy";
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval"; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
@ -303,6 +300,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
//Keys with no defaults //Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory"; public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup"; public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins"; public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins";

View File

@ -44,7 +44,9 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -72,7 +74,7 @@ class BlockPoolSliceScanner {
private final AtomicLong lastScanTime = new AtomicLong(); private final AtomicLong lastScanTime = new AtomicLong();
private final DataNode datanode; private final DataNode datanode;
private final FSDatasetInterface<? extends FsVolumeSpi> dataset; private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
private final SortedSet<BlockScanInfo> blockInfoSet private final SortedSet<BlockScanInfo> blockInfoSet
= new TreeSet<BlockScanInfo>(); = new TreeSet<BlockScanInfo>();
@ -134,8 +136,7 @@ public int compareTo(BlockScanInfo other) {
} }
BlockPoolSliceScanner(String bpid, DataNode datanode, BlockPoolSliceScanner(String bpid, DataNode datanode,
FSDatasetInterface<? extends FsVolumeSpi> dataset, FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) {
Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;
this.blockPoolId = bpid; this.blockPoolId = bpid;

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/** /**
@ -43,7 +44,7 @@
public class DataBlockScanner implements Runnable { public class DataBlockScanner implements Runnable {
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
private final DataNode datanode; private final DataNode datanode;
private final FSDatasetInterface<? extends FsVolumeSpi> dataset; private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
private final Configuration conf; private final Configuration conf;
/** /**
@ -55,7 +56,7 @@ public class DataBlockScanner implements Runnable {
Thread blockScannerThread = null; Thread blockScannerThread = null;
DataBlockScanner(DataNode datanode, DataBlockScanner(DataNode datanode,
FSDatasetInterface<? extends FsVolumeSpi> dataset, FsDatasetSpi<? extends FsVolumeSpi> dataset,
Configuration conf) { Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;

View File

@ -124,6 +124,7 @@
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
@ -235,7 +236,7 @@ public static InetSocketAddress createSocketAddr(String target
volatile boolean shouldRun = true; volatile boolean shouldRun = true;
private BlockPoolManager blockPoolManager; private BlockPoolManager blockPoolManager;
volatile FSDatasetInterface<? extends FsVolumeSpi> data = null; volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
private String clusterId = null; private String clusterId = null;
public final static String EMPTY_DEL_HINT = ""; public final static String EMPTY_DEL_HINT = "";
@ -814,8 +815,8 @@ int getBpOsCount() {
* handshake with the the first namenode is completed. * handshake with the the first namenode is completed.
*/ */
private void initStorage(final NamespaceInfo nsInfo) throws IOException { private void initStorage(final NamespaceInfo nsInfo) throws IOException {
final FSDatasetInterface.Factory<? extends FSDatasetInterface<?>> factory final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
= FSDatasetInterface.Factory.getFactory(conf); = FsDatasetSpi.Factory.getFactory(conf);
if (!factory.isSimulated()) { if (!factory.isSimulated()) {
final StartupOption startOpt = getStartupOption(conf); final StartupOption startOpt = getStartupOption(conf);
@ -833,7 +834,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException {
synchronized(this) { synchronized(this) {
if (data == null) { if (data == null) {
data = factory.createFSDatasetInterface(this, storage, conf); data = factory.newInstance(this, storage, conf);
} }
} }
} }
@ -1700,7 +1701,7 @@ public void scheduleAllBlockReport(long delay) {
* *
* @return the fsdataset that stores the blocks * @return the fsdataset that stores the blocks
*/ */
FSDatasetInterface<?> getFSDataset() { FsDatasetSpi<?> getFSDataset() {
return data; return data;
} }

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -55,7 +56,7 @@ public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private final DataNode datanode; private final DataNode datanode;
private final FSDatasetInterface<?> dataset; private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool; private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread; private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs; private final long scanPeriodMsecs;
@ -219,7 +220,7 @@ public long getGenStamp() {
} }
} }
DirectoryScanner(DataNode dn, FSDatasetInterface<?> dataset, Configuration conf) { DirectoryScanner(DataNode dn, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = dn; this.datanode = dn;
this.dataset = dataset; this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
@ -411,7 +412,7 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
} }
/** Is the given volume still valid in the dataset? */ /** Is the given volume still valid in the dataset? */
private static boolean isValid(final FSDatasetInterface<?> dataset, private static boolean isValid(final FsDatasetSpi<?> dataset,
final FsVolumeSpi volume) { final FsVolumeSpi volume) {
for (FsVolumeSpi vol : dataset.getVolumes()) { for (FsVolumeSpi vol : dataset.getVolumes()) {
if (vol == volume) { if (vol == volume) {

View File

@ -61,10 +61,14 @@
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -82,13 +86,13 @@
* *
***************************************************/ ***************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> { public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
/** /**
* A factory for creating FSDataset objects. * A factory for creating FSDataset objects.
*/ */
static class Factory extends FSDatasetInterface.Factory<FSDataset> { public static class Factory extends FsDatasetSpi.Factory<FSDataset> {
@Override @Override
public FSDataset createFSDatasetInterface(DataNode datanode, public FSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException { DataStorage storage, Configuration conf) throws IOException {
return new FSDataset(datanode, storage, conf); return new FSDataset(datanode, storage, conf);
} }
@ -823,11 +827,11 @@ static class FSVolumeSet {
*/ */
private volatile List<FSVolume> volumes = null; private volatile List<FSVolume> volumes = null;
BlockVolumeChoosingPolicy<FSVolume> blockChooser; final VolumeChoosingPolicy<FSVolume> blockChooser;
int numFailedVolumes; int numFailedVolumes;
FSVolumeSet(List<FSVolume> volumes, int failedVols, FSVolumeSet(List<FSVolume> volumes, int failedVols,
BlockVolumeChoosingPolicy<FSVolume> blockChooser) { VolumeChoosingPolicy<FSVolume> blockChooser) {
this.volumes = Collections.unmodifiableList(volumes); this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser; this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols; this.numFailedVolumes = failedVols;
@ -1018,7 +1022,7 @@ private static long parseGenerationStamp(File blockFile, File metaFile
} }
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public List<FSVolume> getVolumes() { public List<FSVolume> getVolumes() {
return volumes.volumes; return volumes.volumes;
} }
@ -1029,7 +1033,7 @@ public synchronized FSVolume getVolume(final ExtendedBlock b) {
return r != null? (FSVolume)r.getVolume(): null; return r != null? (FSVolume)r.getVolume(): null;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized Block getStoredBlock(String bpid, long blkid) public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException { throws IOException {
File blockfile = getFile(bpid, blkid); File blockfile = getFile(bpid, blkid);
@ -1066,7 +1070,7 @@ ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
return null; return null;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public LengthInputStream getMetaDataInputStream(ExtendedBlock b) public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
throws IOException { throws IOException {
final File meta = getMetaFile(b); final File meta = getMetaFile(b);
@ -1125,11 +1129,11 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
volumeMap = new ReplicasMap(this); volumeMap = new ReplicasMap(this);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final BlockVolumeChoosingPolicy<FSVolume> blockChooserImpl = final VolumeChoosingPolicy<FSVolume> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass( ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY, DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumesPolicy.class, RoundRobinVolumeChoosingPolicy.class,
BlockVolumeChoosingPolicy.class), conf); VolumeChoosingPolicy.class), conf);
volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl); volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
volumes.getVolumeMap(volumeMap); volumes.getVolumeMap(volumeMap);
@ -1164,7 +1168,7 @@ public long getBlockPoolUsed(String bpid) throws IOException {
/** /**
* Return true - if there are still valid volumes on the DataNode. * Return true - if there are still valid volumes on the DataNode.
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public boolean hasEnoughResource() { public boolean hasEnoughResource() {
return getVolumes().size() >= validVolsRequired; return getVolumes().size() >= validVolsRequired;
} }
@ -1199,7 +1203,7 @@ public int getNumFailedVolumes() {
/** /**
* Find the block's on-disk length * Find the block's on-disk length
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public long getLength(ExtendedBlock b) throws IOException { public long getLength(ExtendedBlock b) throws IOException {
return getBlockFile(b).length(); return getBlockFile(b).length();
} }
@ -1243,7 +1247,7 @@ private File getBlockFileNoExistsCheck(ExtendedBlock b)
return f; return f;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public InputStream getBlockInputStream(ExtendedBlock b, public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException { long seekOffset) throws IOException {
File blockFile = getBlockFileNoExistsCheck(b); File blockFile = getBlockFileNoExistsCheck(b);
@ -1301,7 +1305,7 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid)
/** /**
* Returns handles to the block file and its metadata file * Returns handles to the block file and its metadata file
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long ckoff) throws IOException { long blkOffset, long ckoff) throws IOException {
ReplicaInfo info = getReplicaInfo(b); ReplicaInfo info = getReplicaInfo(b);
@ -1406,7 +1410,7 @@ static private void truncateBlock(File blockFile, File metaFile,
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException { long newGS, long expectedBlockLen) throws IOException {
// If the block was successfully finalized because all packets // If the block was successfully finalized because all packets
@ -1547,7 +1551,7 @@ private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
return replicaInfo; return replicaInfo;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException { long newGS, long expectedBlockLen) throws IOException {
DataNode.LOG.info("Recover failed append to " + b); DataNode.LOG.info("Recover failed append to " + b);
@ -1564,7 +1568,7 @@ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
} }
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void recoverClose(ExtendedBlock b, long newGS, public void recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException { long expectedBlockLen) throws IOException {
DataNode.LOG.info("Recover failed close " + b); DataNode.LOG.info("Recover failed close " + b);
@ -1606,7 +1610,7 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo,
} }
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
throws IOException { throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
@ -1626,7 +1630,7 @@ public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
return newReplicaInfo; return newReplicaInfo;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException { throws IOException {
@ -1671,7 +1675,7 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
return rbw; return rbw;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface convertTemporaryToRbw( public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
final ExtendedBlock b) throws IOException { final ExtendedBlock b) throws IOException {
final long blockId = b.getBlockId(); final long blockId = b.getBlockId();
@ -1732,7 +1736,7 @@ public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
return rbw; return rbw;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
throws IOException { throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
@ -1756,7 +1760,7 @@ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
* Sets the offset in the meta file so that the * Sets the offset in the meta file so that the
* last checksum will be overwritten. * last checksum will be overwritten.
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
int checksumSize) throws IOException { int checksumSize) throws IOException {
FileOutputStream file = (FileOutputStream) streams.getChecksumOut(); FileOutputStream file = (FileOutputStream) streams.getChecksumOut();
@ -1781,7 +1785,7 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea
/** /**
* Complete the block write! * Complete the block write!
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(b); ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) { if (replicaInfo.getState() == ReplicaState.FINALIZED) {
@ -1818,7 +1822,7 @@ private synchronized FinalizedReplica finalizeReplica(String bpid,
/** /**
* Remove the temporary block file (if any) * Remove the temporary block file (if any)
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock()); b.getLocalBlock());
@ -1863,7 +1867,7 @@ private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
/** /**
* Generates a block report from the in-memory block map. * Generates a block report from the in-memory block map.
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public BlockListAsLongs getBlockReport(String bpid) { public BlockListAsLongs getBlockReport(String bpid) {
int size = volumeMap.size(bpid); int size = volumeMap.size(bpid);
ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size); ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
@ -1914,7 +1918,7 @@ public synchronized List<Block> getFinalizedBlocks(String bpid) {
* Check whether the given block is a valid one. * Check whether the given block is a valid one.
* valid means finalized * valid means finalized
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public boolean isValidBlock(ExtendedBlock b) { public boolean isValidBlock(ExtendedBlock b) {
return isValid(b, ReplicaState.FINALIZED); return isValid(b, ReplicaState.FINALIZED);
} }
@ -1922,7 +1926,7 @@ public boolean isValidBlock(ExtendedBlock b) {
/** /**
* Check whether the given block is a valid RBW. * Check whether the given block is a valid RBW.
*/ */
@Override // {@link FSDatasetInterface} @Override // {@link FsDatasetSpi}
public boolean isValidRbw(final ExtendedBlock b) { public boolean isValidRbw(final ExtendedBlock b) {
return isValid(b, ReplicaState.RBW); return isValid(b, ReplicaState.RBW);
} }
@ -1987,7 +1991,7 @@ static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
* could lazily garbage-collect the block, but why bother? * could lazily garbage-collect the block, but why bother?
* just get rid of it. * just get rid of it.
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException { public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
boolean error = false; boolean error = false;
for (int i = 0; i < invalidBlks.length; i++) { for (int i = 0; i < invalidBlks.length; i++) {
@ -2053,7 +2057,7 @@ public void notifyNamenodeDeletedBlock(ExtendedBlock block){
datanode.notifyNamenodeDeletedBlock(block); datanode.notifyNamenodeDeletedBlock(block);
} }
@Override // {@link FSDatasetInterface} @Override // {@link FsDatasetSpi}
public synchronized boolean contains(final ExtendedBlock block) { public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId(); final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId) != null; return getFile(block.getBlockPoolId(), blockId) != null;
@ -2078,7 +2082,7 @@ File getFile(final String bpid, final long blockId) {
* to these volumes * to these volumes
* @throws DiskErrorException * @throws DiskErrorException
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void checkDataDir() throws DiskErrorException { public void checkDataDir() throws DiskErrorException {
long totalBlocks=0, removedBlocks=0; long totalBlocks=0, removedBlocks=0;
List<FSVolume> failedVols = volumes.checkDirs(); List<FSVolume> failedVols = volumes.checkDirs();
@ -2122,7 +2126,7 @@ public void checkDataDir() throws DiskErrorException {
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public String toString() { public String toString() {
return "FSDataset{dirpath='"+volumes+"'}"; return "FSDataset{dirpath='"+volumes+"'}";
} }
@ -2153,7 +2157,7 @@ void registerMBean(final String storageId) {
DataNode.LOG.info("Registered FSDatasetState MBean"); DataNode.LOG.info("Registered FSDatasetState MBean");
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void shutdown() { public void shutdown() {
if (mbeanName != null) if (mbeanName != null)
MBeans.unregister(mbeanName); MBeans.unregister(mbeanName);
@ -2334,7 +2338,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile,
/** /**
* @deprecated use {@link #fetchReplicaInfo(String, long)} instead. * @deprecated use {@link #fetchReplicaInfo(String, long)} instead.
*/ */
@Override // FSDatasetInterface @Override // FsDatasetSpi
@Deprecated @Deprecated
public ReplicaInfo getReplica(String bpid, long blockId) { public ReplicaInfo getReplica(String bpid, long blockId) {
return volumeMap.get(bpid, blockId); return volumeMap.get(bpid, blockId);
@ -2346,7 +2350,7 @@ public synchronized String getReplicaString(String bpid, long blockId) {
return r == null? "null": r.toString(); return r == null? "null": r.toString();
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaRecoveryInfo initReplicaRecovery( public synchronized ReplicaRecoveryInfo initReplicaRecovery(
RecoveringBlock rBlock) throws IOException { RecoveringBlock rBlock) throws IOException {
return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(),
@ -2419,7 +2423,7 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
return rur.createInfo(); return rur.createInfo();
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized String updateReplicaUnderRecovery( public synchronized String updateReplicaUnderRecovery(
final ExtendedBlock oldBlock, final ExtendedBlock oldBlock,
final long recoveryId, final long recoveryId,
@ -2501,7 +2505,7 @@ private FinalizedReplica updateReplicaUnderRecovery(
return finalizeReplica(bpid, rur); return finalizeReplica(bpid, rur);
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized long getReplicaVisibleLength(final ExtendedBlock block) public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException { throws IOException {
final Replica replica = getReplicaInfo(block.getBlockPoolId(), final Replica replica = getReplicaInfo(block.getBlockPoolId(),
@ -2584,7 +2588,7 @@ public Map<String, Object> getVolumeInfoMap() {
return info; return info;
} }
@Override //FSDatasetInterface @Override //FsDatasetSpi
public synchronized void deleteBlockPool(String bpid, boolean force) public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException { throws IOException {
if (!force) { if (!force) {
@ -2602,7 +2606,7 @@ public synchronized void deleteBlockPool(String bpid, boolean force)
} }
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException { throws IOException {
File datafile = getBlockFile(block); File datafile = getBlockFile(block);

View File

@ -25,7 +25,7 @@
/** /**
* This defines the interface of a replica in Pipeline that's being written to * This defines the interface of a replica in Pipeline that's being written to
*/ */
interface ReplicaInPipelineInterface extends Replica { public interface ReplicaInPipelineInterface extends Replica {
/** /**
* Set the number of bytes received * Set the number of bytes received
* @param bytesReceived number of bytes received * @param bytesReceived number of bytes received

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.File; import java.io.File;
@ -31,10 +31,11 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -42,19 +43,16 @@
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
/** /**
* This is an interface for the underlying storage that stores blocks for * This is a service provider interface for the underlying storage that
* a data node. * stores replicas for a data node.
* Examples are the FSDataset (which stores blocks on dirs) and * The default implementation stores replicas on local drives.
* SimulatedFSDataset (which simulates data).
*
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface FSDatasetInterface<V extends FsVolumeSpi> public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
extends FSDatasetMBean {
/** /**
* A factory for creating FSDatasetInterface objects. * A factory for creating {@link FsDatasetSpi} objects.
*/ */
public abstract class Factory<D extends FSDatasetInterface<?>> { public static abstract class Factory<D extends FsDatasetSpi<?>> {
/** @return the configured factory. */ /** @return the configured factory. */
public static Factory<?> getFactory(Configuration conf) { public static Factory<?> getFactory(Configuration conf) {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@ -65,10 +63,9 @@ public static Factory<?> getFactory(Configuration conf) {
return ReflectionUtils.newInstance(clazz, conf); return ReflectionUtils.newInstance(clazz, conf);
} }
/** Create a FSDatasetInterface object. */ /** Create a new object. */
public abstract D createFSDatasetInterface( public abstract D newInstance(DataNode datanode, DataStorage storage,
DataNode datanode, DataStorage storage, Configuration conf Configuration conf) throws IOException;
) throws IOException;
/** Does the factory create simulated objects? */ /** Does the factory create simulated objects? */
public boolean isSimulated() { public boolean isSimulated() {
@ -82,7 +79,8 @@ public boolean isSimulated() {
* @param prefix the prefix of the log names. * @param prefix the prefix of the log names.
* @return rolling logs * @return rolling logs
*/ */
public RollingLogs createRollingLogs(String bpid, String prefix) throws IOException; public RollingLogs createRollingLogs(String bpid, String prefix
) throws IOException;
/** @return a list of volumes. */ /** @return a list of volumes. */
public List<V> getVolumes(); public List<V> getVolumes();
@ -167,15 +165,15 @@ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset)
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
long ckoff) throws IOException; long ckoff) throws IOException;
/** /**
* Creates a temporary replica and returns the meta information of the replica * Creates a temporary replica and returns the meta information of the replica
* *
* @param b block * @param b block
* @return the meta info of the replica which is being written to * @return the meta info of the replica which is being written to
* @throws IOException if an error occurs * @throws IOException if an error occurs
*/ */
public ReplicaInPipelineInterface createTemporary(ExtendedBlock b) public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
throws IOException; ) throws IOException;
/** /**
* Creates a RBW replica and returns the meta info of the replica * Creates a RBW replica and returns the meta info of the replica
@ -184,7 +182,8 @@ public ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
* @return the meta info of the replica which is being written to * @return the meta info of the replica which is being written to
* @throws IOException if an error occurs * @throws IOException if an error occurs
*/ */
public ReplicaInPipelineInterface createRbw(ExtendedBlock b) throws IOException; public ReplicaInPipelineInterface createRbw(ExtendedBlock b
) throws IOException;
/** /**
* Recovers a RBW replica and returns the meta info of the replica * Recovers a RBW replica and returns the meta info of the replica
@ -197,8 +196,7 @@ public ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
* @throws IOException if an error occurs * @throws IOException if an error occurs
*/ */
public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
throws IOException;
/** /**
* Covert a temporary replica to a RBW. * Covert a temporary replica to a RBW.
@ -217,8 +215,8 @@ public ReplicaInPipelineInterface convertTemporaryToRbw(
* @return the meata info of the replica which is being written to * @return the meata info of the replica which is being written to
* @throws IOException * @throws IOException
*/ */
public ReplicaInPipelineInterface append(ExtendedBlock b, public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS,
long newGS, long expectedBlockLen) throws IOException; long expectedBlockLen) throws IOException;
/** /**
* Recover a failed append to a finalized replica * Recover a failed append to a finalized replica
@ -230,8 +228,8 @@ public ReplicaInPipelineInterface append(ExtendedBlock b,
* @return the meta info of the replica which is being written to * @return the meta info of the replica which is being written to
* @throws IOException * @throws IOException
*/ */
public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS,
long newGS, long expectedBlockLen) throws IOException; long expectedBlockLen) throws IOException;
/** /**
* Recover a failed pipeline close * Recover a failed pipeline close
@ -242,8 +240,8 @@ public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
* @param expectedBlockLen the number of bytes the replica is expected to have * @param expectedBlockLen the number of bytes the replica is expected to have
* @throws IOException * @throws IOException
*/ */
public void recoverClose(ExtendedBlock b, public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
long newGS, long expectedBlockLen) throws IOException; ) throws IOException;
/** /**
* Finalizes the block previously opened for writing using writeToBlock. * Finalizes the block previously opened for writing using writeToBlock.
@ -300,7 +298,7 @@ public void recoverClose(ExtendedBlock b,
* @throws DiskErrorException * @throws DiskErrorException
*/ */
public void checkDataDir() throws DiskErrorException; public void checkDataDir() throws DiskErrorException;
/** /**
* Shutdown the FSDataset * Shutdown the FSDataset
*/ */
@ -310,12 +308,12 @@ public void recoverClose(ExtendedBlock b,
* Sets the file pointer of the checksum stream so that the last checksum * Sets the file pointer of the checksum stream so that the last checksum
* will be overwritten * will be overwritten
* @param b block * @param b block
* @param stream The stream for the data file and checksum file * @param outs The streams for the data file and checksum file
* @param checksumSize number of bytes each checksum has * @param checksumSize number of bytes each checksum has
* @throws IOException * @throws IOException
*/ */
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream, public void adjustCrcChannelPosition(ExtendedBlock b,
int checksumSize) throws IOException; ReplicaOutputStreams outs, int checksumSize) throws IOException;
/** /**
* Checks how many valid storage volumes there are in the DataNode. * Checks how many valid storage volumes there are in the DataNode.
@ -334,8 +332,8 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea
* @return actual state of the replica on this data-node or * @return actual state of the replica on this data-node or
* null if data-node does not have the replica. * null if data-node does not have the replica.
*/ */
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock
throws IOException; ) throws IOException;
/** /**
* Update replica's generation stamp and length and finalize it. * Update replica's generation stamp and length and finalize it.
@ -372,6 +370,7 @@ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
/** /**
* Get {@link BlockLocalPathInfo} for the given block. * Get {@link BlockLocalPathInfo} for the given block.
**/ */
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) throws IOException; public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
) throws IOException;
} }

View File

@ -15,16 +15,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
/** /**
* Rolling logs consist of a current log and a previous log. * Rolling logs consist of a current log and a set of previous logs.
* When the roll operation is invoked, current is rolled to previous *
* and previous is deleted.
* The implementation should support a single appender and multiple readers. * The implementation should support a single appender and multiple readers.
*/ */
public interface RollingLogs { public interface RollingLogs {
@ -57,7 +56,7 @@ public interface Appender extends Appendable, Closeable {
public Appender appender(); public Appender appender();
/** /**
* Roll current to previous and delete the previous. * Roll current to previous.
* *
* @return true if the rolling succeeded. * @return true if the rolling succeeded.
* When it returns false, it is not equivalent to an error. * When it returns false, it is not equivalent to an error.

View File

@ -15,16 +15,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
public class RoundRobinVolumesPolicy<V extends FsVolumeSpi> /**
implements BlockVolumeChoosingPolicy<V> { * Choose volumes in round-robin order.
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
private int curVolume = 0; private int curVolume = 0;
@ -55,13 +57,10 @@ public synchronized V chooseVolume(final List<V> volumes, final long blockSize
} }
if (curVolume == startVolume) { if (curVolume == startVolume) {
throw new DiskOutOfSpaceException( throw new DiskOutOfSpaceException("Out of space: "
"Insufficient space for an additional block. Volume with the most available space has " + "The volume with the most available space (=" + maxAvailable
+ maxAvailable + " B) is less than the block size (=" + blockSize + " B).");
+ " bytes free, configured block size is "
+ blockSize);
} }
} }
} }
} }

View File

@ -15,37 +15,29 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/************************************************** /**
* BlockVolumeChoosingPolicy allows a DataNode to * This interface specifies the policy for choosing volumes to store replicas.
* specify what policy is to be used while choosing */
* a volume for a block request.
*
* Note: This is an evolving i/f and is only for
* advanced use.
*
***************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public interface BlockVolumeChoosingPolicy<V extends FsVolumeSpi> { public interface VolumeChoosingPolicy<V extends FsVolumeSpi> {
/** /**
* Returns a specific FSVolume after applying a suitable choice algorithm * Choose a volume to place a replica,
* to place a given block, given a list of FSVolumes and the block * given a list of volumes and the replica size sought for storage.
* size sought for storage.
* *
* (Policies that maintain state must be thread-safe.) * The implementations of this interface must be thread-safe.
* *
* @param volumes - the array of FSVolumes that are available. * @param volumes - a list of available volumes.
* @param blockSize - the size of the block for which a volume is sought. * @param replicaSize - the size of the replica for which a volume is sought.
* @return the chosen volume to store the block. * @return the chosen volume.
* @throws IOException when disks are unavailable or are full. * @throws IOException when disks are unavailable or are full.
*/ */
public V chooseVolume(List<V> volumes, long blockSize) throws IOException; public V chooseVolume(List<V> volumes, long replicaSize) throws IOException;
} }

View File

@ -428,15 +428,6 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.block.volume.choice.policy</name>
<value>org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy</value>
<description>The policy class to use to determine into which of the
datanode's available volumes a block must be written to. Default is a simple
round-robin policy that chooses volumes in a cyclic order.
</description>
</property>
<property> <property>
<name>dfs.heartbeat.interval</name> <name>dfs.heartbeat.interval</name>
<value>3</value> <value>3</value>

View File

@ -25,8 +25,8 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog; import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
@ -125,7 +125,7 @@ public static void main(String[] args) {
} else if (args[i].equals("-simulated")) { } else if (args[i].equals("-simulated")) {
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
} else if (args[i].equals("-inject")) { } else if (args[i].equals("-inject")) {
if (!FSDatasetInterface.Factory.getFactory(conf).isSimulated()) { if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) {
System.out.print("-inject is valid only for simulated"); System.out.print("-inject is valid only for simulated");
printUsageExit(); printUsageExit();
} }
@ -157,8 +157,7 @@ public static void main(String[] args) {
System.out.println("No name node address and port in config"); System.out.println("No name node address and port in config");
System.exit(-1); System.exit(-1);
} }
boolean simulated = boolean simulated = FsDatasetSpi.Factory.getFactory(conf).isSimulated();
FSDatasetInterface.Factory.getFactory(conf).isSimulated();
System.out.println("Starting " + numDataNodes + System.out.println("Starting " + numDataNodes +
(simulated ? " Simulated " : " ") + (simulated ? " Simulated " : " ") +
" Data Nodes that will connect to Name Node at " + nameNodeAdr); " Data Nodes that will connect to Name Node at " + nameNodeAdr);

View File

@ -17,6 +17,29 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HOSTS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import java.io.File; import java.io.File;
@ -43,9 +66,6 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocolHelper; import org.apache.hadoop.ha.HAServiceProtocolHelper;
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
@ -57,21 +77,20 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -1802,7 +1821,7 @@ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) thro
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn); final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
if (!(dataSet instanceof SimulatedFSDataset)) { if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
} }
@ -1821,7 +1840,7 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
final DataNode dn = dataNodes.get(dataNodeIndex).datanode; final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn); final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
if (!(dataSet instanceof SimulatedFSDataset)) { if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
} }

View File

@ -60,8 +60,8 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -211,7 +211,7 @@ public void testFileCreation() throws IOException {
// can't check capacities for real storage since the OS file system may be changing under us. // can't check capacities for real storage since the OS file system may be changing under us.
if (simulatedStorage) { if (simulatedStorage) {
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
FSDatasetInterface<?> dataset = DataNodeTestUtils.getFSDataset(dn); FsDatasetSpi<?> dataset = DataNodeTestUtils.getFSDataset(dn);
assertEquals(fileSize, dataset.getDfsUsed()); assertEquals(fileSize, dataset.getDfsUsed());
assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize,
dataset.getRemaining()); dataset.getRemaining());

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
/** /**
@ -48,7 +49,7 @@ public class DataNodeTestUtils {
* *
* @return the fsdataset that stores the blocks * @return the fsdataset that stores the blocks
*/ */
public static FSDatasetInterface<?> getFSDataset(DataNode dn) { public static FsDatasetSpi<?> getFSDataset(DataNode dn) {
return dn.getFSDataset(); return dn.getFSDataset();
} }

View File

@ -39,10 +39,12 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -65,10 +67,10 @@
* *
* Note the synchronization is coarse grained - it is at each method. * Note the synchronization is coarse grained - it is at each method.
*/ */
public class SimulatedFSDataset implements FSDatasetInterface<FsVolumeSpi> { public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static class Factory extends FSDatasetInterface.Factory<SimulatedFSDataset> { static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
@Override @Override
public SimulatedFSDataset createFSDatasetInterface(DataNode datanode, public SimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException { DataStorage storage, Configuration conf) throws IOException {
return new SimulatedFSDataset(datanode, storage, conf); return new SimulatedFSDataset(datanode, storage, conf);
} }
@ -427,7 +429,7 @@ private Map<Block, BInfo> getMap(String bpid) throws IOException {
return map; return map;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
@ -437,7 +439,7 @@ public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes()); binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes());
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) { public synchronized void unfinalizeBlock(ExtendedBlock b) {
if (isValidRbw(b)) { if (isValidRbw(b)) {
blockMap.remove(b.getLocalBlock()); blockMap.remove(b.getLocalBlock());
@ -483,7 +485,7 @@ public int getNumFailedVolumes() {
return storage.getNumFailedVolumes(); return storage.getNumFailedVolumes();
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized long getLength(ExtendedBlock b) throws IOException { public synchronized long getLength(ExtendedBlock b) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
@ -513,7 +515,7 @@ public synchronized String getReplicaString(String bpid, long blockId) {
return r == null? "null": r.toString(); return r == null? "null": r.toString();
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid) throws IOException { public Block getStoredBlock(String bpid, long blkid) throws IOException {
final Map<Block, BInfo> map = blockMap.get(bpid); final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) { if (map != null) {
@ -526,7 +528,7 @@ public Block getStoredBlock(String bpid, long blkid) throws IOException {
return null; return null;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized void invalidate(String bpid, Block[] invalidBlks) public synchronized void invalidate(String bpid, Block[] invalidBlks)
throws IOException { throws IOException {
boolean error = false; boolean error = false;
@ -557,12 +559,12 @@ private BInfo getBInfo(final ExtendedBlock b) {
return map == null? null: map.get(b.getLocalBlock()); return map == null? null: map.get(b.getLocalBlock());
} }
@Override // {@link FSDatasetInterface} @Override // {@link FsDatasetSpi}
public boolean contains(ExtendedBlock block) { public boolean contains(ExtendedBlock block) {
return getBInfo(block) != null; return getBInfo(block) != null;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized boolean isValidBlock(ExtendedBlock b) { public synchronized boolean isValidBlock(ExtendedBlock b) {
final BInfo binfo = getBInfo(b); final BInfo binfo = getBInfo(b);
return binfo != null && binfo.isFinalized(); return binfo != null && binfo.isFinalized();
@ -580,7 +582,7 @@ public String toString() {
return getStorageInfo(); return getStorageInfo();
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException { long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@ -593,7 +595,7 @@ public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
return binfo; return binfo;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException { long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@ -611,7 +613,7 @@ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
return binfo; return binfo;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@ -628,7 +630,7 @@ public void recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
map.put(binfo.theBlock, binfo); map.put(binfo.theBlock, binfo);
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@ -647,13 +649,13 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
return binfo; return binfo;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
throws IOException { throws IOException {
return createTemporary(b); return createTemporary(b);
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
throws IOException { throws IOException {
if (isValidBlock(b)) { if (isValidBlock(b)) {
@ -681,7 +683,7 @@ synchronized InputStream getBlockInputStream(ExtendedBlock b
return binfo.getIStream(); return binfo.getIStream();
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized InputStream getBlockInputStream(ExtendedBlock b, public synchronized InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException { long seekOffset) throws IOException {
InputStream result = getBlockInputStream(b); InputStream result = getBlockInputStream(b);
@ -690,13 +692,13 @@ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
} }
/** Not supported */ /** Not supported */
@Override // FSDatasetInterface @Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
long ckoff) throws IOException { long ckoff) throws IOException {
throw new IOException("Not supported"); throw new IOException("Not supported");
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
) throws IOException { ) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@ -717,7 +719,7 @@ public void checkDataDir() throws DiskErrorException {
// nothing to check for simulated data set // nothing to check for simulated data set
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public synchronized void adjustCrcChannelPosition(ExtendedBlock b, public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
ReplicaOutputStreams stream, ReplicaOutputStreams stream,
int checksumSize) int checksumSize)
@ -902,32 +904,32 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW);
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long recoveryId,
long newlength) { long newlength) {
return storageId; return storageId;
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public long getReplicaVisibleLength(ExtendedBlock block) { public long getReplicaVisibleLength(ExtendedBlock block) {
return block.getNumBytes(); return block.getNumBytes();
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void addBlockPool(String bpid, Configuration conf) { public void addBlockPool(String bpid, Configuration conf) {
Map<Block, BInfo> map = new HashMap<Block, BInfo>(); Map<Block, BInfo> map = new HashMap<Block, BInfo>();
blockMap.put(bpid, map); blockMap.put(bpid, map);
storage.addBlockPool(bpid); storage.addBlockPool(bpid);
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void shutdownBlockPool(String bpid) { public void shutdownBlockPool(String bpid) {
blockMap.remove(bpid); blockMap.remove(bpid);
storage.removeBlockPool(bpid); storage.removeBlockPool(bpid);
} }
@Override // FSDatasetInterface @Override // FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force) { public void deleteBlockPool(String bpid, boolean force) {
return; return;
} }

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -77,7 +78,7 @@ public class TestBPOfferService {
private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2]; private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
private int heartbeatCounts[] = new int[2]; private int heartbeatCounts[] = new int[2];
private DataNode mockDn; private DataNode mockDn;
private FSDatasetInterface<?> mockFSDataset; private FsDatasetSpi<?> mockFSDataset;
@Before @Before
public void setupMocks() throws Exception { public void setupMocks() throws Exception {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -88,12 +89,12 @@ int addSomeBlocks(SimulatedFSDataset fsdataset ) throws IOException {
public void testFSDatasetFactory() { public void testFSDatasetFactory() {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
FSDatasetInterface.Factory<?> f = FSDatasetInterface.Factory.getFactory(conf); FsDatasetSpi.Factory<?> f = FsDatasetSpi.Factory.getFactory(conf);
assertEquals(FSDataset.Factory.class, f.getClass()); assertEquals(FSDataset.Factory.class, f.getClass());
assertFalse(f.isSimulated()); assertFalse(f.isSimulated());
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
FSDatasetInterface.Factory<?> s = FSDatasetInterface.Factory.getFactory(conf); FsDatasetSpi.Factory<?> s = FsDatasetSpi.Factory.getFactory(conf);
assertEquals(SimulatedFSDataset.Factory.class, s.getClass()); assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
assertTrue(s.isSimulated()); assertTrue(s.isSimulated());
} }

View File

@ -15,20 +15,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
public class TestRoundRobinVolumesPolicy { public class TestRoundRobinVolumeChoosingPolicy {
// Test the Round-Robin block-volume choosing algorithm. // Test the Round-Robin block-volume choosing algorithm.
@Test @Test
@ -44,9 +43,8 @@ public void testRR() throws Exception {
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final RoundRobinVolumesPolicy<FsVolumeSpi> policy = final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy =
(RoundRobinVolumesPolicy<FsVolumeSpi>)ReflectionUtils.newInstance( ReflectionUtils.newInstance(RoundRobinVolumeChoosingPolicy.class, null);
RoundRobinVolumesPolicy.class, null);
// Test two rounds of round-robin choosing // Test two rounds of round-robin choosing
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
@ -67,10 +65,10 @@ public void testRR() throws Exception {
} }
} }
// ChooseVolume should throw DiskOutOfSpaceException with volume and block sizes in exception message. // ChooseVolume should throw DiskOutOfSpaceException
// with volume and block sizes in exception message.
@Test @Test
public void testRRPolicyExceptionMessage() public void testRRPolicyExceptionMessage() throws Exception {
throws Exception {
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>(); final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
// First volume, with 500 bytes of space. // First volume, with 500 bytes of space.
@ -81,18 +79,17 @@ public void testRRPolicyExceptionMessage()
volumes.add(Mockito.mock(FsVolumeSpi.class)); volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
final RoundRobinVolumesPolicy<FsVolumeSpi> policy final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy
= new RoundRobinVolumesPolicy<FsVolumeSpi>(); = new RoundRobinVolumeChoosingPolicy<FsVolumeSpi>();
int blockSize = 700; int blockSize = 700;
try { try {
policy.chooseVolume(volumes, blockSize); policy.chooseVolume(volumes, blockSize);
Assert.fail("expected to throw DiskOutOfSpaceException"); Assert.fail("expected to throw DiskOutOfSpaceException");
} catch (DiskOutOfSpaceException e) { } catch(DiskOutOfSpaceException e) {
Assert Assert.assertEquals("Not returnig the expected message",
.assertEquals( "Out of space: The volume with the most available space (=" + 600
"Not returnig the expected message", + " B) is less than the block size (=" + blockSize + " B).",
"Insufficient space for an additional block. Volume with the most available space has 600 bytes free, configured block size is " + blockSize, e e.getMessage());
.getMessage());
} }
} }