HDFS-2887. FSVolume, is a part of FSDatasetInterface implementation, should not be referred outside FSDataset. A new FSVolumeInterface is defined. The BlockVolumeChoosingPolicy.chooseVolume(..) method signature is also updated. (szetszwo)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1242087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-08 20:58:29 +00:00
parent b2d49acd08
commit b6ffb08a46
24 changed files with 374 additions and 292 deletions

View File

@ -201,6 +201,13 @@ Trunk (unreleased changes)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
HDFS-2887. FSVolume, is a part of FSDatasetInterface implementation, should
not be referred outside FSDataset. A new FSVolumeInterface is defined.
The BlockVolumeChoosingPolicy.chooseVolume(..) method signature is also
updated. (szetszwo)
NEW FEATURES NEW FEATURES
IMPROVEMENTS IMPROVEMENTS

View File

@ -46,15 +46,14 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
/** /**
* Performs two types of scanning: * Performs two types of scanning:
* <li> Gets block files from the data directories and reconciles the * <li> Gets block files from the data directories and reconciles the
* difference between the blocks on the disk and in memory in * difference between the blocks on the disk and in memory.</li>
* {@link FSDataset}</li>
* <li> Scans the data directories for block files under a block pool * <li> Scans the data directories for block files under a block pool
* and verifies that the files are not corrupt</li> * and verifies that the files are not corrupt</li>
* This keeps track of blocks and their last verification times. * This keeps track of blocks and their last verification times.
@ -78,7 +77,7 @@ class BlockPoolSliceScanner {
private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000; private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
private DataNode datanode; private DataNode datanode;
private FSDataset dataset; private final FSDatasetInterface dataset;
// sorted set // sorted set
private TreeSet<BlockScanInfo> blockInfoSet; private TreeSet<BlockScanInfo> blockInfoSet;
@ -137,8 +136,8 @@ public int compareTo(BlockScanInfo other) {
} }
} }
BlockPoolSliceScanner(DataNode datanode, FSDataset dataset, Configuration conf, BlockPoolSliceScanner(DataNode datanode, FSDatasetInterface dataset,
String bpid) { Configuration conf, String bpid) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;
this.blockPoolId = bpid; this.blockPoolId = bpid;
@ -220,16 +219,16 @@ void init() throws IOException {
* otherwise, pick the first directory. * otherwise, pick the first directory.
*/ */
File dir = null; File dir = null;
List<FSVolume> volumes = dataset.volumes.getVolumes(); List<FSVolumeInterface> volumes = dataset.getVolumes();
for (FSDataset.FSVolume vol : dataset.volumes.getVolumes()) { for (FSVolumeInterface vol : volumes) {
File bpDir = vol.getBlockPoolSlice(blockPoolId).getDirectory(); File bpDir = vol.getDirectory(blockPoolId);
if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) { if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {
dir = bpDir; dir = bpDir;
break; break;
} }
} }
if (dir == null) { if (dir == null) {
dir = volumes.get(0).getBlockPoolSlice(blockPoolId).getDirectory(); dir = volumes.get(0).getDirectory(blockPoolId);
} }
try { try {
@ -577,8 +576,8 @@ private synchronized void updateBytesLeft(long len) {
bytesLeft += len; bytesLeft += len;
} }
static File getCurrentFile(FSVolume vol, String bpid) throws IOException { static File getCurrentFile(FSVolumeInterface vol, String bpid) throws IOException {
return LogFileHandler.getCurrentFile(vol.getBlockPoolSlice(bpid).getDirectory(), return LogFileHandler.getCurrentFile(vol.getDirectory(bpid),
BlockPoolSliceScanner.verificationLogFile); BlockPoolSliceScanner.verificationLogFile);
} }

View File

@ -22,7 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
/************************************************** /**************************************************
* BlockVolumeChoosingPolicy allows a DataNode to * BlockVolumeChoosingPolicy allows a DataNode to
@ -46,7 +46,7 @@ public interface BlockVolumeChoosingPolicy {
* @return the chosen volume to store the block. * @return the chosen volume to store the block.
* @throws IOException when disks are unavailable or are full. * @throws IOException when disks are unavailable or are full.
*/ */
public FSVolume chooseVolume(List<FSVolume> volumes, long blockSize) public FSVolumeInterface chooseVolume(List<FSVolumeInterface> volumes, long blockSize)
throws IOException; throws IOException;
} }

View File

@ -27,12 +27,12 @@
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* DataBlockScanner manages block scanning for all the block pools. For each * DataBlockScanner manages block scanning for all the block pools. For each
@ -44,7 +44,7 @@
public class DataBlockScanner implements Runnable { public class DataBlockScanner implements Runnable {
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
private final DataNode datanode; private final DataNode datanode;
private final FSDataset dataset; private final FSDatasetInterface dataset;
private final Configuration conf; private final Configuration conf;
/** /**
@ -55,7 +55,7 @@ public class DataBlockScanner implements Runnable {
new TreeMap<String, BlockPoolSliceScanner>(); new TreeMap<String, BlockPoolSliceScanner>();
Thread blockScannerThread = null; Thread blockScannerThread = null;
DataBlockScanner(DataNode datanode, FSDataset dataset, Configuration conf) { DataBlockScanner(DataNode datanode, FSDatasetInterface dataset, Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;
this.conf = conf; this.conf = conf;
@ -135,7 +135,7 @@ private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
.iterator(); .iterator();
while (bpidIterator.hasNext()) { while (bpidIterator.hasNext()) {
String bpid = bpidIterator.next(); String bpid = bpidIterator.next();
for (FSDataset.FSVolume vol : dataset.volumes.getVolumes()) { for (FSDatasetInterface.FSVolumeInterface vol : dataset.getVolumes()) {
try { try {
File currFile = BlockPoolSliceScanner.getCurrentFile(vol, bpid); File currFile = BlockPoolSliceScanner.getCurrentFile(vol, bpid);
if (currFile.exists()) { if (currFile.exists()) {

View File

@ -126,7 +126,6 @@
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
@ -580,11 +579,11 @@ private synchronized void initDataBlockScanner(Configuration conf) {
if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) { DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
reason = "verification is turned off by configuration"; reason = "verification is turned off by configuration";
} else if (!(data instanceof FSDataset)) { } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
reason = "verifcation is supported only with FSDataset"; reason = "verifcation is not supported by SimulatedFSDataset";
} }
if (reason == null) { if (reason == null) {
blockScanner = new DataBlockScanner(this, (FSDataset)data, conf); blockScanner = new DataBlockScanner(this, data, conf);
blockScanner.start(); blockScanner.start();
} else { } else {
LOG.info("Periodic Block Verification scan is disabled because " + LOG.info("Periodic Block Verification scan is disabled because " +
@ -609,11 +608,11 @@ private synchronized void initDirectoryScanner(Configuration conf) {
if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) { DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
reason = "verification is turned off by configuration"; reason = "verification is turned off by configuration";
} else if (!(data instanceof FSDataset)) { } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
reason = "verification is supported only with FSDataset"; reason = "verifcation is not supported by SimulatedFSDataset";
} }
if (reason == null) { if (reason == null) {
directoryScanner = new DirectoryScanner(this, (FSDataset) data, conf); directoryScanner = new DirectoryScanner(this, data, conf);
directoryScanner.start(); directoryScanner.start();
} else { } else {
LOG.info("Periodic Directory Tree Verification scan is disabled because " + LOG.info("Periodic Directory Tree Verification scan is disabled because " +
@ -2235,16 +2234,7 @@ public String getNamenodeAddresses() {
*/ */
@Override // DataNodeMXBean @Override // DataNodeMXBean
public String getVolumeInfo() { public String getVolumeInfo() {
final Map<String, Object> info = new HashMap<String, Object>(); return JSON.toString(data.getVolumeInfoMap());
Collection<VolumeInfo> volumes = ((FSDataset)this.data).getVolumeInfo();
for (VolumeInfo v : volumes) {
final Map<String, Object> innerInfo = new HashMap<String, Object>();
innerInfo.put("usedSpace", v.usedSpace);
innerInfo.put("freeSpace", v.freeSpace);
innerInfo.put("reservedSpace", v.reservedSpace);
info.put(v.directory, innerInfo);
}
return JSON.toString(info);
} }
@Override // DataNodeMXBean @Override // DataNodeMXBean

View File

@ -751,7 +751,7 @@ private static String convertMetatadataFileName(String oldFileName) {
Matcher matcher = PRE_GENSTAMP_META_FILE_PATTERN.matcher(oldFileName); Matcher matcher = PRE_GENSTAMP_META_FILE_PATTERN.matcher(oldFileName);
if (matcher.matches()) { if (matcher.matches()) {
//return the current metadata file name //return the current metadata file name
return FSDataset.getMetaFileName(matcher.group(1), return DatanodeUtil.getMetaFileName(matcher.group(1),
GenerationStamp.GRANDFATHER_GENERATION_STAMP); GenerationStamp.GRANDFATHER_GENERATION_STAMP);
} }
return oldFileName; return oldFileName;

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -26,6 +28,10 @@
/** Provide utility methods for Datanode. */ /** Provide utility methods for Datanode. */
@InterfaceAudience.Private @InterfaceAudience.Private
class DatanodeUtil { class DatanodeUtil {
static final String METADATA_EXTENSION = ".meta";
static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
private final static String DISK_ERROR = "Possible disk error on file creation: "; private final static String DISK_ERROR = "Possible disk error on file creation: ";
/** Get the cause of an I/O exception if caused by a possible disk error /** Get the cause of an I/O exception if caused by a possible disk error
@ -64,4 +70,37 @@ static File createTmpFile(Block b, File f) throws IOException {
} }
return f; return f;
} }
static String getMetaFileName(String blockFileName, long genStamp) {
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
}
static File getMetaFile(File f, long genStamp) {
return new File(getMetaFileName(f.getAbsolutePath(), genStamp));
}
/** Find the corresponding meta data file from a given block file */
static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
final File parent = blockFile.getParentFile();
File[] matches = parent.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return dir.equals(parent)
&& name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
}
});
if (matches == null || matches.length == 0) {
throw new IOException("Meta file not found, blockFile=" + blockFile);
}
else if (matches.length > 1) {
throw new IOException("Found more than one meta files: "
+ Arrays.asList(matches));
}
return matches[0];
}
static File getUnlinkTmpFile(File f) {
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
}
} }

View File

@ -43,20 +43,19 @@
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
/** /**
* Periodically scans the data directories for block and block metadata files. * Periodically scans the data directories for block and block metadata files.
* Reconciles the differences with block information maintained in * Reconciles the differences with block information maintained in the dataset.
* {@link FSDataset}
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DirectoryScanner implements Runnable { public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private final DataNode datanode; private final DataNode datanode;
private final FSDataset dataset; private final FSDatasetInterface dataset;
private final ExecutorService reportCompileThreadPool; private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread; private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs; private final long scanPeriodMsecs;
@ -158,13 +157,13 @@ static class ScanInfo implements Comparable<ScanInfo> {
private final long blockId; private final long blockId;
private final File metaFile; private final File metaFile;
private final File blockFile; private final File blockFile;
private final FSVolume volume; private final FSVolumeInterface volume;
ScanInfo(long blockId) { ScanInfo(long blockId) {
this(blockId, null, null, null); this(blockId, null, null, null);
} }
ScanInfo(long blockId, File blockFile, File metaFile, FSVolume vol) { ScanInfo(long blockId, File blockFile, File metaFile, FSVolumeInterface vol) {
this.blockId = blockId; this.blockId = blockId;
this.metaFile = metaFile; this.metaFile = metaFile;
this.blockFile = blockFile; this.blockFile = blockFile;
@ -183,7 +182,7 @@ long getBlockId() {
return blockId; return blockId;
} }
FSVolume getVolume() { FSVolumeInterface getVolume() {
return volume; return volume;
} }
@ -220,7 +219,7 @@ public long getGenStamp() {
} }
} }
DirectoryScanner(DataNode dn, FSDataset dataset, Configuration conf) { DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) {
this.datanode = dn; this.datanode = dn;
this.dataset = dataset; this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
@ -269,7 +268,7 @@ public void run() {
return; return;
} }
String[] bpids = dataset.getBPIdlist(); String[] bpids = dataset.getBlockPoolList();
for(String bpid : bpids) { for(String bpid : bpids) {
UpgradeManagerDatanode um = UpgradeManagerDatanode um =
datanode.getUpgradeManagerDatanode(bpid); datanode.getUpgradeManagerDatanode(bpid);
@ -411,17 +410,29 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
diffRecord.add(new ScanInfo(blockId)); diffRecord.add(new ScanInfo(blockId));
} }
/** Is the given volume still valid in the dataset? */
private static boolean isValid(final FSDatasetInterface dataset,
final FSVolumeInterface volume) {
for (FSVolumeInterface vol : dataset.getVolumes()) {
if (vol == volume) {
return true;
}
}
return false;
}
/** Get lists of blocks on the disk sorted by blockId, per blockpool */ /** Get lists of blocks on the disk sorted by blockId, per blockpool */
private Map<String, ScanInfo[]> getDiskReport() { private Map<String, ScanInfo[]> getDiskReport() {
// First get list of data directories // First get list of data directories
List<FSVolume> volumes = dataset.volumes.getVolumes(); final List<FSVolumeInterface> volumes = dataset.getVolumes();
ArrayList<ScanInfoPerBlockPool> dirReports = ArrayList<ScanInfoPerBlockPool> dirReports =
new ArrayList<ScanInfoPerBlockPool>(volumes.size()); new ArrayList<ScanInfoPerBlockPool>(volumes.size());
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress = Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>(); new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) { for (int i = 0; i < volumes.size(); i++) {
if (!dataset.volumes.isValid(volumes.get(i))) { // volume is still valid if (!isValid(dataset, volumes.get(i))) {
// volume is invalid
dirReports.add(i, null); dirReports.add(i, null);
} else { } else {
ReportCompiler reportCompiler = ReportCompiler reportCompiler =
@ -446,7 +457,8 @@ private Map<String, ScanInfo[]> getDiskReport() {
// Compile consolidated report for all the volumes // Compile consolidated report for all the volumes
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool(); ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
for (int i = 0; i < volumes.size(); i++) { for (int i = 0; i < volumes.size(); i++) {
if (dataset.volumes.isValid(volumes.get(i))) { // volume is still valid if (isValid(dataset, volumes.get(i))) {
// volume is still valid
list.addAll(dirReports.get(i)); list.addAll(dirReports.get(i));
} }
} }
@ -461,9 +473,9 @@ private static boolean isBlockMetaFile(String blockId, String metaFile) {
private static class ReportCompiler private static class ReportCompiler
implements Callable<ScanInfoPerBlockPool> { implements Callable<ScanInfoPerBlockPool> {
private FSVolume volume; private FSVolumeInterface volume;
public ReportCompiler(FSVolume volume) { public ReportCompiler(FSVolumeInterface volume) {
this.volume = volume; this.volume = volume;
} }
@ -473,14 +485,14 @@ public ScanInfoPerBlockPool call() throws Exception {
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length); ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
for (String bpid : bpList) { for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<ScanInfo>(); LinkedList<ScanInfo> report = new LinkedList<ScanInfo>();
File bpFinalizedDir = volume.getBlockPoolSlice(bpid).getFinalizedDir(); File bpFinalizedDir = volume.getFinalizedDir(bpid);
result.put(bpid, compileReport(volume, bpFinalizedDir, report)); result.put(bpid, compileReport(volume, bpFinalizedDir, report));
} }
return result; return result;
} }
/** Compile list {@link ScanInfo} for the blocks in the directory <dir> */ /** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
private LinkedList<ScanInfo> compileReport(FSVolume vol, File dir, private LinkedList<ScanInfo> compileReport(FSVolumeInterface vol, File dir,
LinkedList<ScanInfo> report) { LinkedList<ScanInfo> report) {
File[] files; File[] files;
try { try {

View File

@ -23,7 +23,6 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
@ -81,14 +80,13 @@ class FSDataset implements FSDatasetInterface {
* A node type that can be built into a tree reflecting the * A node type that can be built into a tree reflecting the
* hierarchy of blocks on the local disk. * hierarchy of blocks on the local disk.
*/ */
class FSDir { private class FSDir {
File dir; final File dir;
int numBlocks = 0; int numBlocks = 0;
FSDir children[]; FSDir children[];
int lastChildIdx = 0; int lastChildIdx = 0;
/**
*/ private FSDir(File dir)
public FSDir(File dir)
throws IOException { throws IOException {
this.dir = dir; this.dir = dir;
this.children = null; this.children = null;
@ -113,7 +111,7 @@ public FSDir(File dir)
} }
} }
public File addBlock(Block b, File src) throws IOException { private File addBlock(Block b, File src) throws IOException {
//First try without creating subdirectories //First try without creating subdirectories
File file = addBlock(b, src, false, false); File file = addBlock(b, src, false, false);
return (file != null) ? file : addBlock(b, src, true, true); return (file != null) ? file : addBlock(b, src, true, true);
@ -161,7 +159,7 @@ private File addBlock(Block b, File src, boolean createOk,
return children[ lastChildIdx ].addBlock(b, src, true, false); return children[ lastChildIdx ].addBlock(b, src, true, false);
} }
void getVolumeMap(String bpid, ReplicasMap volumeMap, FSVolume volume) private void getVolumeMap(String bpid, ReplicasMap volumeMap, FSVolume volume)
throws IOException { throws IOException {
if (children != null) { if (children != null) {
for (int i = 0; i < children.length; i++) { for (int i = 0; i < children.length; i++) {
@ -207,7 +205,7 @@ private void recoverTempUnlinkedBlock() throws IOException {
* check if a data diretory is healthy * check if a data diretory is healthy
* @throws DiskErrorException * @throws DiskErrorException
*/ */
public void checkDirTree() throws DiskErrorException { private void checkDirTree() throws DiskErrorException {
DiskChecker.checkDir(dir); DiskChecker.checkDir(dir);
if (children != null) { if (children != null) {
@ -217,7 +215,7 @@ public void checkDirTree() throws DiskErrorException {
} }
} }
void clearPath(File f) { private void clearPath(File f) {
String root = dir.getAbsolutePath(); String root = dir.getAbsolutePath();
String dir = f.getAbsolutePath(); String dir = f.getAbsolutePath();
if (dir.startsWith(root)) { if (dir.startsWith(root)) {
@ -271,6 +269,7 @@ private boolean clearPath(File f, String[] dirNames, int idx) {
return false; return false;
} }
@Override
public String toString() { public String toString() {
return "FSDir{" + return "FSDir{" +
"dir=" + dir + "dir=" + dir +
@ -284,7 +283,7 @@ public String toString() {
* Taken together, all BlockPoolSlices sharing a block pool ID across a * Taken together, all BlockPoolSlices sharing a block pool ID across a
* cluster represent a single block pool. * cluster represent a single block pool.
*/ */
class BlockPoolSlice { private class BlockPoolSlice {
private final String bpid; private final String bpid;
private final FSVolume volume; // volume to which this BlockPool belongs to private final FSVolume volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current private final File currentDir; // StorageDirectory/current/bpid/current
@ -343,10 +342,6 @@ File getDirectory() {
return currentDir.getParentFile(); return currentDir.getParentFile();
} }
File getCurrentDir() {
return currentDir;
}
File getFinalizedDir() { File getFinalizedDir() {
return finalizedDir.dir; return finalizedDir.dir;
} }
@ -387,7 +382,7 @@ File createRbwFile(Block b) throws IOException {
File addBlock(Block b, File f) throws IOException { File addBlock(Block b, File f) throws IOException {
File blockFile = finalizedDir.addBlock(b, f); File blockFile = finalizedDir.addBlock(b, f);
File metaFile = getMetaFile(blockFile , b.getGenerationStamp()); File metaFile = DatanodeUtil.getMetaFile(blockFile, b.getGenerationStamp());
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile; return blockFile;
} }
@ -455,7 +450,7 @@ private long validateIntegrity(File blockFile, long genStamp) {
DataInputStream checksumIn = null; DataInputStream checksumIn = null;
InputStream blockIn = null; InputStream blockIn = null;
try { try {
File metaFile = new File(getMetaFileName(blockFile.toString(), genStamp)); final File metaFile = DatanodeUtil.getMetaFile(blockFile, genStamp);
long blockFileLen = blockFile.length(); long blockFileLen = blockFile.length();
long metaFileLen = metaFile.length(); long metaFileLen = metaFile.length();
int crcHeaderLen = DataChecksum.getChecksumHeaderSize(); int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
@ -521,7 +516,7 @@ public void shutdown() {
} }
} }
class FSVolume { class FSVolume implements FSVolumeInterface {
private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>(); private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current private final File currentDir; // <StorageDirectory>/current
private final DF usage; private final DF usage;
@ -535,11 +530,6 @@ class FSVolume {
this.usage = new DF(parent, conf); this.usage = new DF(parent, conf);
} }
/** Return storage directory corresponding to the volume */
File getDir() {
return currentDir.getParentFile();
}
File getCurrentDir() { File getCurrentDir() {
return currentDir; return currentDir;
} }
@ -584,7 +574,8 @@ long getCapacity() {
return remaining > 0 ? remaining : 0; return remaining > 0 ? remaining : 0;
} }
long getAvailable() throws IOException { @Override
public long getAvailable() throws IOException {
long remaining = getCapacity()-getDfsUsed(); long remaining = getCapacity()-getDfsUsed();
long available = usage.getAvailable(); long available = usage.getAvailable();
if (remaining>available) { if (remaining>available) {
@ -601,7 +592,7 @@ String getMount() throws IOException {
return usage.getMount(); return usage.getMount();
} }
BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException { private BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
BlockPoolSlice bp = map.get(bpid); BlockPoolSlice bp = map.get(bpid);
if (bp == null) { if (bp == null) {
throw new IOException("block pool " + bpid + " is not found"); throw new IOException("block pool " + bpid + " is not found");
@ -609,10 +600,21 @@ BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
return bp; return bp;
} }
@Override
public File getDirectory(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory();
}
@Override
public File getFinalizedDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getFinalizedDir();
}
/** /**
* Make a deep copy of the list of currently active BPIDs * Make a deep copy of the list of currently active BPIDs
*/ */
String[] getBlockPoolList() { @Override
public String[] getBlockPoolList() {
synchronized(FSDataset.this) { synchronized(FSDataset.this) {
return map.keySet().toArray(new String[map.keySet().size()]); return map.keySet().toArray(new String[map.keySet().size()]);
} }
@ -682,6 +684,7 @@ void clearPath(String bpid, File f) throws IOException {
bp.clearPath(f); bp.clearPath(f);
} }
@Override
public String toString() { public String toString() {
return currentDir.getAbsolutePath(); return currentDir.getAbsolutePath();
} }
@ -773,21 +776,18 @@ static class FSVolumeSet {
* Read access to this unmodifiable list is not synchronized. * Read access to this unmodifiable list is not synchronized.
* This list is replaced on modification holding "this" lock. * This list is replaced on modification holding "this" lock.
*/ */
private volatile List<FSVolume> volumes = null; private volatile List<FSVolumeInterface> volumes = null;
BlockVolumeChoosingPolicy blockChooser; BlockVolumeChoosingPolicy blockChooser;
int numFailedVolumes; int numFailedVolumes;
FSVolumeSet(FSVolume[] volumes, int failedVols, BlockVolumeChoosingPolicy blockChooser) { FSVolumeSet(List<FSVolumeInterface> volumes, int failedVols,
List<FSVolume> list = Arrays.asList(volumes); BlockVolumeChoosingPolicy blockChooser) {
this.volumes = Collections.unmodifiableList(list); this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser; this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols; this.numFailedVolumes = failedVols;
} }
private int numberOfVolumes() {
return volumes.size();
}
private int numberOfFailedVolumes() { private int numberOfFailedVolumes() {
return numFailedVolumes; return numFailedVolumes;
} }
@ -800,36 +800,36 @@ private int numberOfFailedVolumes() {
* @return next volume to store the block in. * @return next volume to store the block in.
*/ */
synchronized FSVolume getNextVolume(long blockSize) throws IOException { synchronized FSVolume getNextVolume(long blockSize) throws IOException {
return blockChooser.chooseVolume(volumes, blockSize); return (FSVolume)blockChooser.chooseVolume(volumes, blockSize);
} }
private long getDfsUsed() throws IOException { private long getDfsUsed() throws IOException {
long dfsUsed = 0L; long dfsUsed = 0L;
for (FSVolume vol : volumes) { for (FSVolumeInterface v : volumes) {
dfsUsed += vol.getDfsUsed(); dfsUsed += ((FSVolume)v).getDfsUsed();
} }
return dfsUsed; return dfsUsed;
} }
private long getBlockPoolUsed(String bpid) throws IOException { private long getBlockPoolUsed(String bpid) throws IOException {
long dfsUsed = 0L; long dfsUsed = 0L;
for (FSVolume vol : volumes) { for (FSVolumeInterface v : volumes) {
dfsUsed += vol.getBlockPoolUsed(bpid); dfsUsed += ((FSVolume)v).getBlockPoolUsed(bpid);
} }
return dfsUsed; return dfsUsed;
} }
private long getCapacity() { private long getCapacity() {
long capacity = 0L; long capacity = 0L;
for (FSVolume vol : volumes) { for (FSVolumeInterface v : volumes) {
capacity += vol.getCapacity(); capacity += ((FSVolume)v).getCapacity();
} }
return capacity; return capacity;
} }
private long getRemaining() throws IOException { private long getRemaining() throws IOException {
long remaining = 0L; long remaining = 0L;
for (FSVolume vol : volumes) { for (FSVolumeInterface vol : volumes) {
remaining += vol.getAvailable(); remaining += vol.getAvailable();
} }
return remaining; return remaining;
@ -837,15 +837,15 @@ private long getRemaining() throws IOException {
private void getVolumeMap(ReplicasMap volumeMap) private void getVolumeMap(ReplicasMap volumeMap)
throws IOException { throws IOException {
for (FSVolume vol : volumes) { for (FSVolumeInterface v : volumes) {
vol.getVolumeMap(volumeMap); ((FSVolume)v).getVolumeMap(volumeMap);
} }
} }
private void getVolumeMap(String bpid, ReplicasMap volumeMap) private void getVolumeMap(String bpid, ReplicasMap volumeMap)
throws IOException { throws IOException {
for (FSVolume vol : volumes) { for (FSVolumeInterface v : volumes) {
vol.getVolumeMap(bpid, volumeMap); ((FSVolume)v).getVolumeMap(bpid, volumeMap);
} }
} }
@ -861,10 +861,10 @@ private synchronized List<FSVolume> checkDirs() {
ArrayList<FSVolume> removedVols = null; ArrayList<FSVolume> removedVols = null;
// Make a copy of volumes for performing modification // Make a copy of volumes for performing modification
List<FSVolume> volumeList = new ArrayList<FSVolume>(getVolumes()); final List<FSVolumeInterface> volumeList = new ArrayList<FSVolumeInterface>(volumes);
for (int idx = 0; idx < volumeList.size(); idx++) { for (int idx = 0; idx < volumeList.size(); idx++) {
FSVolume fsv = volumeList.get(idx); FSVolume fsv = (FSVolume)volumeList.get(idx);
try { try {
fsv.checkDirs(); fsv.checkDirs();
} catch (DiskErrorException e) { } catch (DiskErrorException e) {
@ -881,8 +881,8 @@ private synchronized List<FSVolume> checkDirs() {
// Remove null volumes from the volumes array // Remove null volumes from the volumes array
if (removedVols != null && removedVols.size() > 0) { if (removedVols != null && removedVols.size() > 0) {
List<FSVolume> newVols = new ArrayList<FSVolume>(); List<FSVolumeInterface> newVols = new ArrayList<FSVolumeInterface>();
for (FSVolume vol : volumeList) { for (FSVolumeInterface vol : volumeList) {
if (vol != null) { if (vol != null) {
newVols.add(vol); newVols.add(vol);
} }
@ -896,43 +896,29 @@ private synchronized List<FSVolume> checkDirs() {
return removedVols; return removedVols;
} }
@Override
public String toString() { public String toString() {
return volumes.toString(); return volumes.toString();
} }
boolean isValid(FSVolume volume) {
for (FSVolume vol : volumes) {
if (vol == volume) {
return true;
}
}
return false;
}
private void addBlockPool(String bpid, Configuration conf) private void addBlockPool(String bpid, Configuration conf)
throws IOException { throws IOException {
for (FSVolume v : volumes) { for (FSVolumeInterface v : volumes) {
v.addBlockPool(bpid, conf); ((FSVolume)v).addBlockPool(bpid, conf);
} }
} }
private void removeBlockPool(String bpid) { private void removeBlockPool(String bpid) {
for (FSVolume v : volumes) { for (FSVolumeInterface v : volumes) {
v.shutdownBlockPool(bpid); ((FSVolume)v).shutdownBlockPool(bpid);
} }
} }
/**
* @return unmodifiable list of volumes
*/
public List<FSVolume> getVolumes() {
return volumes;
}
private void shutdown() { private void shutdown() {
for (FSVolume volume : volumes) { for (FSVolumeInterface volume : volumes) {
if(volume != null) { if(volume != null) {
volume.shutdown(); ((FSVolume)volume).shutdown();
} }
} }
} }
@ -944,35 +930,20 @@ private void shutdown() {
// //
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
//Find better place?
static final String METADATA_EXTENSION = ".meta";
static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
private static boolean isUnlinkTmpFile(File f) { private static boolean isUnlinkTmpFile(File f) {
String name = f.getName(); String name = f.getName();
return name.endsWith(UNLINK_BLOCK_SUFFIX); return name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
}
static File getUnlinkTmpFile(File f) {
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
} }
private static File getOrigFile(File unlinkTmpFile) { private static File getOrigFile(File unlinkTmpFile) {
String fileName = unlinkTmpFile.getName(); String fileName = unlinkTmpFile.getName();
return new File(unlinkTmpFile.getParentFile(), return new File(unlinkTmpFile.getParentFile(),
fileName.substring(0, fileName.length()-UNLINK_BLOCK_SUFFIX.length())); fileName.substring(0,
} fileName.length() - DatanodeUtil.UNLINK_BLOCK_SUFFIX.length()));
static String getMetaFileName(String blockFileName, long genStamp) {
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
}
static File getMetaFile(File f , long genStamp) {
return new File(getMetaFileName(f.getAbsolutePath(), genStamp));
} }
protected File getMetaFile(ExtendedBlock b) throws IOException { protected File getMetaFile(ExtendedBlock b) throws IOException {
return getMetaFile(getBlockFile(b), b.getGenerationStamp()); return DatanodeUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
} }
/** Find the metadata file for the specified block file. /** Find the metadata file for the specified block file.
@ -995,33 +966,12 @@ private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
return GenerationStamp.GRANDFATHER_GENERATION_STAMP; return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
} }
/** Find the corresponding meta data file from a given block file */
private static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
final File parent = blockFile.getParentFile();
File[] matches = parent.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return dir.equals(parent)
&& name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
}
});
if (matches == null || matches.length == 0) {
throw new IOException("Meta file not found, blockFile=" + blockFile);
}
else if (matches.length > 1) {
throw new IOException("Found more than one meta files: "
+ Arrays.asList(matches));
}
return matches[0];
}
/** Find the corresponding meta data file from a given block file */ /** Find the corresponding meta data file from a given block file */
private static long parseGenerationStamp(File blockFile, File metaFile private static long parseGenerationStamp(File blockFile, File metaFile
) throws IOException { ) throws IOException {
String metaname = metaFile.getName(); String metaname = metaFile.getName();
String gs = metaname.substring(blockFile.getName().length() + 1, String gs = metaname.substring(blockFile.getName().length() + 1,
metaname.length() - METADATA_EXTENSION.length()); metaname.length() - DatanodeUtil.METADATA_EXTENSION.length());
try { try {
return Long.parseLong(gs); return Long.parseLong(gs);
} catch(NumberFormatException nfe) { } catch(NumberFormatException nfe) {
@ -1030,6 +980,11 @@ private static long parseGenerationStamp(File blockFile, File metaFile
} }
} }
@Override // FSDatasetInterface
public List<FSVolumeInterface> getVolumes() {
return volumes.volumes;
}
@Override // FSDatasetInterface @Override // FSDatasetInterface
public synchronized Block getStoredBlock(String bpid, long blkid) public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException { throws IOException {
@ -1037,7 +992,7 @@ public synchronized Block getStoredBlock(String bpid, long blkid)
if (blockfile == null) { if (blockfile == null) {
return null; return null;
} }
File metafile = findMetaFile(blockfile); final File metafile = DatanodeUtil.findMetaFile(blockfile);
return new Block(blkid, blockfile.length(), return new Block(blkid, blockfile.length(),
parseGenerationStamp(blockfile, metafile)); parseGenerationStamp(blockfile, metafile));
} }
@ -1101,7 +1056,7 @@ public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
*/ */
public FSDataset(DataNode datanode, DataStorage storage, Configuration conf) FSDataset(DataNode datanode, DataStorage storage, Configuration conf)
throws IOException { throws IOException {
this.datanode = datanode; this.datanode = datanode;
this.maxBlocksPerDir = this.maxBlocksPerDir =
@ -1134,12 +1089,12 @@ public FSDataset(DataNode datanode, DataStorage storage, Configuration conf)
+ ", volume failures tolerated: " + volFailuresTolerated); + ", volume failures tolerated: " + volFailuresTolerated);
} }
FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()]; final List<FSVolumeInterface> volArray = new ArrayList<FSVolumeInterface>(
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), final File dir = storage.getStorageDir(idx).getCurrentDir();
conf); volArray.add(new FSVolume(dir, conf));
DataNode.LOG.info("FSDataset added volume - " DataNode.LOG.info("FSDataset added volume - " + dir);
+ storage.getStorageDir(idx).getCurrentDir());
} }
volumeMap = new ReplicasMap(this); volumeMap = new ReplicasMap(this);
@ -1185,7 +1140,7 @@ public long getBlockPoolUsed(String bpid) throws IOException {
*/ */
@Override // FSDatasetInterface @Override // FSDatasetInterface
public boolean hasEnoughResource() { public boolean hasEnoughResource() {
return volumes.numberOfVolumes() >= validVolsRequired; return getVolumes().size() >= validVolsRequired;
} }
/** /**
@ -1368,8 +1323,8 @@ public boolean unlinkBlock(ExtendedBlock block, int numLinks) throws IOException
private static File moveBlockFiles(Block b, File srcfile, File destdir private static File moveBlockFiles(Block b, File srcfile, File destdir
) throws IOException { ) throws IOException {
final File dstfile = new File(destdir, b.getBlockName()); final File dstfile = new File(destdir, b.getBlockName());
final File srcmeta = getMetaFile(srcfile, b.getGenerationStamp()); final File srcmeta = DatanodeUtil.getMetaFile(srcfile, b.getGenerationStamp());
final File dstmeta = getMetaFile(dstfile, b.getGenerationStamp()); final File dstmeta = DatanodeUtil.getMetaFile(dstfile, b.getGenerationStamp());
if (!srcmeta.renameTo(dstmeta)) { if (!srcmeta.renameTo(dstmeta)) {
throw new IOException("Failed to move meta file for " + b throw new IOException("Failed to move meta file for " + b
+ " from " + srcmeta + " to " + dstmeta); + " from " + srcmeta + " to " + dstmeta);
@ -1487,7 +1442,7 @@ private synchronized ReplicaBeingWritten append(String bpid,
// construct a RBW replica with the new GS // construct a RBW replica with the new GS
File blkfile = replicaInfo.getBlockFile(); File blkfile = replicaInfo.getBlockFile();
FSVolume v = replicaInfo.getVolume(); FSVolume v = (FSVolume)replicaInfo.getVolume();
if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) { if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) {
throw new DiskOutOfSpaceException("Insufficient space for appending to " throw new DiskOutOfSpaceException("Insufficient space for appending to "
+ replicaInfo); + replicaInfo);
@ -1744,7 +1699,7 @@ public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
+ visible + ", temp=" + temp); + visible + ", temp=" + temp);
} }
// check volume // check volume
final FSVolume v = temp.getVolume(); final FSVolume v = (FSVolume)temp.getVolume();
if (v == null) { if (v == null) {
throw new IOException("r.getVolume() = null, temp=" + temp); throw new IOException("r.getVolume() = null, temp=" + temp);
} }
@ -1805,7 +1760,7 @@ synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOE
if ( vol == null ) { if ( vol == null ) {
ReplicaInfo replica = volumeMap.get(bpid, blk); ReplicaInfo replica = volumeMap.get(bpid, blk);
if (replica != null) { if (replica != null) {
vol = volumeMap.get(bpid, blk).getVolume(); vol = (FSVolume)volumeMap.get(bpid, blk).getVolume();
} }
if ( vol == null ) { if ( vol == null ) {
throw new IOException("Could not find volume for block " + blk); throw new IOException("Could not find volume for block " + blk);
@ -1845,7 +1800,7 @@ private synchronized FinalizedReplica finalizeReplica(String bpid,
newReplicaInfo = (FinalizedReplica) newReplicaInfo = (FinalizedReplica)
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica(); ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
} else { } else {
FSVolume v = replicaInfo.getVolume(); FSVolume v = (FSVolume)replicaInfo.getVolume();
File f = replicaInfo.getBlockFile(); File f = replicaInfo.getBlockFile();
if (v == null) { if (v == null) {
throw new IOException("No volume for temporary file " + f + throw new IOException("No volume for temporary file " + f +
@ -1943,7 +1898,8 @@ public BlockListAsLongs getBlockReport(String bpid) {
/** /**
* Get the list of finalized blocks from in-memory blockmap for a block pool. * Get the list of finalized blocks from in-memory blockmap for a block pool.
*/ */
synchronized List<Block> getFinalizedBlocks(String bpid) { @Override
public synchronized List<Block> getFinalizedBlocks(String bpid) {
ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size(bpid)); ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) { for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if(b.getState() == ReplicaState.FINALIZED) { if(b.getState() == ReplicaState.FINALIZED) {
@ -2016,7 +1972,7 @@ static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
} }
//check replica's meta file //check replica's meta file
final File metafile = getMetaFile(f, r.getGenerationStamp()); final File metafile = DatanodeUtil.getMetaFile(f, r.getGenerationStamp());
if (!metafile.exists()) { if (!metafile.exists()) {
throw new IOException("Metafile " + metafile + " does not exist, r=" + r); throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
} }
@ -2047,7 +2003,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
error = true; error = true;
continue; continue;
} }
v = dinfo.getVolume(); v = (FSVolume)dinfo.getVolume();
if (f == null) { if (f == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " DataNode.LOG.warn("Unexpected error trying to delete block "
+ invalidBlks[i] + + invalidBlks[i] +
@ -2081,7 +2037,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
} }
volumeMap.remove(bpid, invalidBlks[i]); volumeMap.remove(bpid, invalidBlks[i]);
} }
File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp()); File metaFile = DatanodeUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp());
// Delete the block asynchronously to make sure we can do it fast enough // Delete the block asynchronously to make sure we can do it fast enough
asyncDiskService.deleteAsync(v, f, metaFile, asyncDiskService.deleteAsync(v, f, metaFile,
@ -2238,8 +2194,9 @@ public String getStorageInfo() {
* @param diskMetaFile Metadata file from on the disk * @param diskMetaFile Metadata file from on the disk
* @param vol Volume of the block file * @param vol Volume of the block file
*/ */
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile, public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FSVolume vol) { File diskMetaFile, FSVolumeInterface vol) {
Block corruptBlock = null; Block corruptBlock = null;
ReplicaInfo memBlockInfo; ReplicaInfo memBlockInfo;
synchronized (this) { synchronized (this) {
@ -2327,7 +2284,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile,
// Compare generation stamp // Compare generation stamp
if (memBlockInfo.getGenerationStamp() != diskGS) { if (memBlockInfo.getGenerationStamp() != diskGS) {
File memMetaFile = getMetaFile(diskFile, File memMetaFile = DatanodeUtil.getMetaFile(diskFile,
memBlockInfo.getGenerationStamp()); memBlockInfo.getGenerationStamp());
if (memMetaFile.exists()) { if (memMetaFile.exists()) {
if (memMetaFile.compareTo(diskMetaFile) != 0) { if (memMetaFile.compareTo(diskMetaFile) != 0) {
@ -2562,18 +2519,15 @@ public synchronized void shutdownBlockPool(String bpid) {
volumes.removeBlockPool(bpid); volumes.removeBlockPool(bpid);
} }
/** @Override
* get list of all bpids public String[] getBlockPoolList() {
* @return list of bpids
*/
public String [] getBPIdlist() {
return volumeMap.getBlockPoolList(); return volumeMap.getBlockPoolList();
} }
/** /**
* Class for representing the Datanode volume information * Class for representing the Datanode volume information
*/ */
static class VolumeInfo { private static class VolumeInfo {
final String directory; final String directory;
final long usedSpace; final long usedSpace;
final long freeSpace; final long freeSpace;
@ -2587,9 +2541,10 @@ static class VolumeInfo {
} }
} }
Collection<VolumeInfo> getVolumeInfo() { private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>(); Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
for (FSVolume volume : volumes.volumes) { for (FSVolumeInterface v : volumes.volumes) {
final FSVolume volume = (FSVolume)v;
long used = 0; long used = 0;
long free = 0; long free = 0;
try { try {
@ -2607,12 +2562,26 @@ Collection<VolumeInfo> getVolumeInfo() {
return info; return info;
} }
@Override
public Map<String, Object> getVolumeInfoMap() {
final Map<String, Object> info = new HashMap<String, Object>();
Collection<VolumeInfo> volumes = getVolumeInfo();
for (VolumeInfo v : volumes) {
final Map<String, Object> innerInfo = new HashMap<String, Object>();
innerInfo.put("usedSpace", v.usedSpace);
innerInfo.put("freeSpace", v.freeSpace);
innerInfo.put("reservedSpace", v.reservedSpace);
info.put(v.directory, innerInfo);
}
return info;
}
@Override //FSDatasetInterface @Override //FSDatasetInterface
public synchronized void deleteBlockPool(String bpid, boolean force) public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException { throws IOException {
if (!force) { if (!force) {
for (FSVolume volume : volumes.volumes) { for (FSVolumeInterface volume : volumes.volumes) {
if (!volume.isBPDirEmpty(bpid)) { if (!((FSVolume)volume).isBPDirEmpty(bpid)) {
DataNode.LOG.warn(bpid DataNode.LOG.warn(bpid
+ " has some block files, cannot delete unless forced"); + " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, " throw new IOException("Cannot delete block pool, "
@ -2620,8 +2589,8 @@ public synchronized void deleteBlockPool(String bpid, boolean force)
} }
} }
} }
for (FSVolume volume : volumes.volumes) { for (FSVolumeInterface volume : volumes.volumes) {
volume.deleteBPDirectories(bpid, force); ((FSVolume)volume).deleteBPDirectories(bpid, force);
} }
} }
@ -2629,7 +2598,7 @@ public synchronized void deleteBlockPool(String bpid, boolean force)
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException { throws IOException {
File datafile = getBlockFile(block); File datafile = getBlockFile(block);
File metafile = getMetaFile(datafile, block.getGenerationStamp()); File metafile = DatanodeUtil.getMetaFile(datafile, block.getGenerationStamp());
BlockLocalPathInfo info = new BlockLocalPathInfo(block, BlockLocalPathInfo info = new BlockLocalPathInfo(block,
datafile.getAbsolutePath(), metafile.getAbsolutePath()); datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info; return info;

View File

@ -19,10 +19,13 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.File;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -46,7 +49,43 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface FSDatasetInterface extends FSDatasetMBean { public interface FSDatasetInterface extends FSDatasetMBean {
/**
* This is an interface for the underlying volume.
* @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume
*/
interface FSVolumeInterface {
/** @return a list of block pools. */
public String[] getBlockPoolList();
/** @return the available storage space in bytes. */
public long getAvailable() throws IOException;
/** @return the directory for the block pool. */
public File getDirectory(String bpid) throws IOException;
/** @return the directory for the finalized blocks in the block pool. */
public File getFinalizedDir(String bpid) throws IOException;
}
/** @return a list of volumes. */
public List<FSVolumeInterface> getVolumes();
/** @return a volume information map (name => info). */
public Map<String, Object> getVolumeInfoMap();
/** @return a list of block pools. */
public String[] getBlockPoolList();
/** @return a list of finalized blocks for the given block pool. */
public List<Block> getFinalizedBlocks(String bpid);
/**
* Check whether the in-memory block record matches the block on the disk,
* and, in case that they are not matched, update the record or mark it
* as corrupted.
*/
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FSVolumeInterface vol);
/** /**
* Returns the length of the metadata file of the specified block * Returns the length of the metadata file of the specified block

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
/** /**
* This class describes a replica that has been finalized. * This class describes a replica that has been finalized.
@ -38,7 +38,7 @@ class FinalizedReplica extends ReplicaInfo {
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
FinalizedReplica(long blockId, long len, long genStamp, FinalizedReplica(long blockId, long len, long genStamp,
FSVolume vol, File dir) { FSVolumeInterface vol, File dir) {
super(blockId, len, genStamp, vol, dir); super(blockId, len, genStamp, vol, dir);
} }
@ -48,7 +48,7 @@ class FinalizedReplica extends ReplicaInfo {
* @param vol volume where replica is located * @param vol volume where replica is located
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
FinalizedReplica(Block block, FSVolume vol, File dir) { FinalizedReplica(Block block, FSVolumeInterface vol, File dir) {
super(block, vol, dir); super(block, vol, dir);
} }

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
/** This class represents replicas being written. /** This class represents replicas being written.
* Those are the replicas that * Those are the replicas that
@ -36,7 +36,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
ReplicaBeingWritten(long blockId, long genStamp, ReplicaBeingWritten(long blockId, long genStamp,
FSVolume vol, File dir) { FSVolumeInterface vol, File dir) {
super( blockId, genStamp, vol, dir); super( blockId, genStamp, vol, dir);
} }
@ -48,7 +48,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
* @param writer a thread that is writing to this replica * @param writer a thread that is writing to this replica
*/ */
ReplicaBeingWritten(Block block, ReplicaBeingWritten(Block block,
FSVolume vol, File dir, Thread writer) { FSVolumeInterface vol, File dir, Thread writer) {
super( block, vol, dir, writer); super( block, vol, dir, writer);
} }
@ -62,7 +62,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
* @param writer a thread that is writing to this replica * @param writer a thread that is writing to this replica
*/ */
ReplicaBeingWritten(long blockId, long len, long genStamp, ReplicaBeingWritten(long blockId, long len, long genStamp,
FSVolume vol, File dir, Thread writer ) { FSVolumeInterface vol, File dir, Thread writer ) {
super( blockId, len, genStamp, vol, dir, writer); super( blockId, len, genStamp, vol, dir, writer);
} }

View File

@ -24,8 +24,8 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -53,7 +53,7 @@ class ReplicaInPipeline extends ReplicaInfo
* @param state replica state * @param state replica state
*/ */
ReplicaInPipeline(long blockId, long genStamp, ReplicaInPipeline(long blockId, long genStamp,
FSVolume vol, File dir) { FSVolumeInterface vol, File dir) {
this( blockId, 0L, genStamp, vol, dir, Thread.currentThread()); this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
} }
@ -65,7 +65,7 @@ class ReplicaInPipeline extends ReplicaInfo
* @param writer a thread that is writing to this replica * @param writer a thread that is writing to this replica
*/ */
ReplicaInPipeline(Block block, ReplicaInPipeline(Block block,
FSVolume vol, File dir, Thread writer) { FSVolumeInterface vol, File dir, Thread writer) {
this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
vol, dir, writer); vol, dir, writer);
} }
@ -80,7 +80,7 @@ class ReplicaInPipeline extends ReplicaInfo
* @param writer a thread that is writing to this replica * @param writer a thread that is writing to this replica
*/ */
ReplicaInPipeline(long blockId, long len, long genStamp, ReplicaInPipeline(long blockId, long len, long genStamp,
FSVolume vol, File dir, Thread writer ) { FSVolumeInterface vol, File dir, Thread writer ) {
super( blockId, len, genStamp, vol, dir); super( blockId, len, genStamp, vol, dir);
this.bytesAcked = len; this.bytesAcked = len;
this.bytesOnDisk = len; this.bytesOnDisk = len;

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
/** /**
@ -35,8 +35,10 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
abstract public class ReplicaInfo extends Block implements Replica { abstract public class ReplicaInfo extends Block implements Replica {
private FSVolume volume; // volume where the replica belongs /** volume where the replica belongs */
private File dir; // directory where block & meta files belong private FSVolumeInterface volume;
/** directory where block & meta files belong */
private File dir;
/** /**
* Constructor for a zero length replica * Constructor for a zero length replica
@ -45,7 +47,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
* @param vol volume where replica is located * @param vol volume where replica is located
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
ReplicaInfo(long blockId, long genStamp, FSVolume vol, File dir) { ReplicaInfo(long blockId, long genStamp, FSVolumeInterface vol, File dir) {
this( blockId, 0L, genStamp, vol, dir); this( blockId, 0L, genStamp, vol, dir);
} }
@ -55,7 +57,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
* @param vol volume where replica is located * @param vol volume where replica is located
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
ReplicaInfo(Block block, FSVolume vol, File dir) { ReplicaInfo(Block block, FSVolumeInterface vol, File dir) {
this(block.getBlockId(), block.getNumBytes(), this(block.getBlockId(), block.getNumBytes(),
block.getGenerationStamp(), vol, dir); block.getGenerationStamp(), vol, dir);
} }
@ -69,7 +71,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
ReplicaInfo(long blockId, long len, long genStamp, ReplicaInfo(long blockId, long len, long genStamp,
FSVolume vol, File dir) { FSVolumeInterface vol, File dir) {
super(blockId, len, genStamp); super(blockId, len, genStamp);
this.volume = vol; this.volume = vol;
this.dir = dir; this.dir = dir;
@ -111,14 +113,14 @@ File getMetaFile() {
* Get the volume where this replica is located on disk * Get the volume where this replica is located on disk
* @return the volume where this replica is located on disk * @return the volume where this replica is located on disk
*/ */
FSVolume getVolume() { FSVolumeInterface getVolume() {
return volume; return volume;
} }
/** /**
* Set the volume where this replica is located on disk * Set the volume where this replica is located on disk
*/ */
void setVolume(FSVolume vol) { void setVolume(FSVolumeInterface vol) {
this.volume = vol; this.volume = vol;
} }
@ -162,7 +164,7 @@ void setUnlinked() {
* be recovered (especially on Windows) on datanode restart. * be recovered (especially on Windows) on datanode restart.
*/ */
private void unlinkFile(File file, Block b) throws IOException { private void unlinkFile(File file, Block b) throws IOException {
File tmpFile = DatanodeUtil.createTmpFile(b, FSDataset.getUnlinkTmpFile(file)); File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
try { try {
FileInputStream in = new FileInputStream(file); FileInputStream in = new FileInputStream(file);
try { try {

View File

@ -20,7 +20,7 @@
import java.io.File; import java.io.File;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
/** /**
@ -145,7 +145,7 @@ void setDir(File dir) {
} }
@Override //ReplicaInfo @Override //ReplicaInfo
void setVolume(FSVolume vol) { void setVolume(FSVolumeInterface vol) {
super.setVolume(vol); super.setVolume(vol);
original.setVolume(vol); original.setVolume(vol);
} }

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
/** /**
* This class represents a replica that is waiting to be recovered. * This class represents a replica that is waiting to be recovered.
@ -44,7 +44,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp, ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
FSVolume vol, File dir) { FSVolumeInterface vol, File dir) {
super(blockId, len, genStamp, vol, dir); super(blockId, len, genStamp, vol, dir);
} }
@ -54,7 +54,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
* @param vol volume where replica is located * @param vol volume where replica is located
* @param dir directory path where block and meta files are located * @param dir directory path where block and meta files are located
*/ */
ReplicaWaitingToBeRecovered(Block block, FSVolume vol, File dir) { ReplicaWaitingToBeRecovered(Block block, FSVolumeInterface vol, File dir) {
super(block, vol, dir); super(block, vol, dir);
} }

View File

@ -20,7 +20,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy { public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
@ -28,8 +28,8 @@ public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
private int curVolume = 0; private int curVolume = 0;
@Override @Override
public synchronized FSVolume chooseVolume(List<FSVolume> volumes, long blockSize) public synchronized FSVolumeInterface chooseVolume(
throws IOException { List<FSVolumeInterface> volumes, long blockSize) throws IOException {
if(volumes.size() < 1) { if(volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes"); throw new DiskOutOfSpaceException("No more available volumes");
} }
@ -44,7 +44,7 @@ public synchronized FSVolume chooseVolume(List<FSVolume> volumes, long blockSize
long maxAvailable = 0; long maxAvailable = 0;
while (true) { while (true) {
FSVolume volume = volumes.get(curVolume); FSVolumeInterface volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size(); curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable(); long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) { return volume; } if (availableVolumeSize > blockSize) { return volume; }

View File

@ -17,12 +17,14 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -38,11 +40,10 @@
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -988,8 +989,33 @@ public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
} }
@Override @Override
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
throws IOException { throw new UnsupportedOperationException();
throw new IOException("getBlockLocalPathInfo not supported."); }
@Override
public String[] getBlockPoolList() {
throw new UnsupportedOperationException();
}
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FSVolumeInterface vol) {
throw new UnsupportedOperationException();
}
@Override
public List<FSVolumeInterface> getVolumes() {
throw new UnsupportedOperationException();
}
@Override
public List<Block> getFinalizedBlocks(String bpid) {
throw new UnsupportedOperationException();
}
@Override
public Map<String, Object> getVolumeInfoMap() {
throw new UnsupportedOperationException();
} }
} }

View File

@ -23,7 +23,7 @@
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,7 +31,6 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.Assert; import org.junit.Assert;
@ -81,11 +80,11 @@ public void test2NNRegistration() throws IOException {
// check number of volumes in fsdataset // check number of volumes in fsdataset
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
Collection<VolumeInfo> volInfos = ((FSDataset) dn.data).getVolumeInfo(); final Map<String, Object> volInfos = dn.data.getVolumeInfoMap();
assertNotNull("No volumes in the fsdataset", volInfos); Assert.assertTrue("No volumes in the fsdataset", volInfos.size() > 0);
int i = 0; int i = 0;
for (VolumeInfo vi : volInfos) { for (Map.Entry<String, Object> e : volInfos.entrySet()) {
LOG.info("vol " + i++ + ";dir=" + vi.directory + ";fs= " + vi.freeSpace); LOG.info("vol " + i++ + ") " + e.getKey() + ": " + e.getValue());
} }
// number of volumes should be 2 - [data1, data2] // number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, volInfos.size()); assertEquals("number of volumes is wrong", 2, volInfos.size());
@ -143,11 +142,11 @@ public void testFedSingleNN() throws IOException {
// check number of vlumes in fsdataset // check number of vlumes in fsdataset
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
Collection<VolumeInfo> volInfos = ((FSDataset) dn.data).getVolumeInfo(); final Map<String, Object> volInfos = dn.data.getVolumeInfoMap();
assertNotNull("No volumes in the fsdataset", volInfos); Assert.assertTrue("No volumes in the fsdataset", volInfos.size() > 0);
int i = 0; int i = 0;
for (VolumeInfo vi : volInfos) { for (Map.Entry<String, Object> e : volInfos.entrySet()) {
LOG.info("vol " + i++ + ";dir=" + vi.directory + ";fs= " + vi.freeSpace); LOG.info("vol " + i++ + ") " + e.getKey() + ": " + e.getValue());
} }
// number of volumes should be 2 - [data1, data2] // number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, volInfos.size()); assertEquals("number of volumes is wrong", 2, volInfos.size());

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File; import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
@ -29,8 +32,8 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -43,13 +46,10 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
/** /**
* Fine-grain testing of block files and locations after volume failure. * Fine-grain testing of block files and locations after volume failure.
@ -274,7 +274,6 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
String file = BlockReaderFactory.getFileName(targetAddr, String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", "test-blockpoolid",
block.getBlockId()); block.getBlockId());
BlockReader blockReader =
BlockReaderFactory.newBlockReader(conf, s, file, block, lblock BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
.getBlockToken(), 0, -1); .getBlockToken(), 0, -1);
@ -372,7 +371,7 @@ private int countRealBlocks(Map<String, BlockLocs> map) {
new FilenameFilter() { new FilenameFilter() {
public boolean accept(File dir, String name) { public boolean accept(File dir, String name) {
return name.startsWith("blk_") && return name.startsWith("blk_") &&
name.endsWith(FSDataset.METADATA_EXTENSION); name.endsWith(DatanodeUtil.METADATA_EXTENSION);
} }
} }
); );

View File

@ -30,17 +30,17 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.junit.Test;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test;
/** Test if a datanode can correctly upgrade itself */ /** Test if a datanode can correctly upgrade itself */
public class TestDatanodeRestart { public class TestDatanodeRestart {
@ -98,8 +98,9 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
out.write(writeBuf); out.write(writeBuf);
out.hflush(); out.hflush();
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
for (FSVolume volume : ((FSDataset)dn.data).volumes.getVolumes()) { for (FSVolumeInterface v : dn.data.getVolumes()) {
File currentDir = volume.getDir().getParentFile(); FSVolume volume = (FSVolume)v;
File currentDir = volume.getCurrentDir().getParentFile().getParentFile();
File rbwDir = new File(currentDir, "rbw"); File rbwDir = new File(currentDir, "rbw");
for (File file : rbwDir.listFiles()) { for (File file : rbwDir.listFiles()) {
if (isCorrupt && Block.isBlockFilename(file)) { if (isCorrupt && Block.isBlockFilename(file)) {
@ -188,7 +189,7 @@ private static void createUnlinkTmpFile(ReplicaInfo replicaInfo,
} else { } else {
src = replicaInfo.getMetaFile(); src = replicaInfo.getMetaFile();
} }
File dst = FSDataset.getUnlinkTmpFile(src); File dst = DatanodeUtil.getUnlinkTmpFile(src);
if (isRename) { if (isRename) {
src.renameTo(dst); src.renameTo(dst);
} else { } else {

View File

@ -25,20 +25,20 @@
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import junit.framework.TestCase;
/** /**
* Tests {@link DirectoryScanner} handling of differences * Tests {@link DirectoryScanner} handling of differences
@ -142,10 +142,10 @@ private String getMetaFile(long id) {
/** Create a block file in a random volume*/ /** Create a block file in a random volume*/
private long createBlockFile() throws IOException { private long createBlockFile() throws IOException {
List<FSVolume> volumes = fds.volumes.getVolumes(); List<FSVolumeInterface> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1); int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id)); File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) { if (file.createNewFile()) {
LOG.info("Created block file " + file.getName()); LOG.info("Created block file " + file.getName());
@ -155,10 +155,10 @@ private long createBlockFile() throws IOException {
/** Create a metafile in a random volume*/ /** Create a metafile in a random volume*/
private long createMetaFile() throws IOException { private long createMetaFile() throws IOException {
List<FSVolume> volumes = fds.volumes.getVolumes(); List<FSVolumeInterface> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1); int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id)); File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) { if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName()); LOG.info("Created metafile " + file.getName());
@ -168,10 +168,10 @@ private long createMetaFile() throws IOException {
/** Create block file and corresponding metafile in a rondom volume */ /** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException { private long createBlockMetaFile() throws IOException {
List<FSVolume> volumes = fds.volumes.getVolumes(); List<FSVolumeInterface> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1); int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getBlockPoolSlice(bpid).getFinalizedDir(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id)); File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) { if (file.createNewFile()) {
LOG.info("Created block file " + file.getName()); LOG.info("Created block file " + file.getName());

View File

@ -21,10 +21,10 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import junit.framework.Assert; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -33,14 +33,14 @@ public class TestRoundRobinVolumesPolicy {
// Test the Round-Robin block-volume choosing algorithm. // Test the Round-Robin block-volume choosing algorithm.
@Test @Test
public void testRR() throws Exception { public void testRR() throws Exception {
final List<FSVolume> volumes = new ArrayList<FSVolume>(); final List<FSVolumeInterface> volumes = new ArrayList<FSVolumeInterface>();
// First volume, with 100 bytes of space. // First volume, with 100 bytes of space.
volumes.add(Mockito.mock(FSVolume.class)); volumes.add(Mockito.mock(FSVolumeInterface.class));
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L); Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
// Second volume, with 200 bytes of space. // Second volume, with 200 bytes of space.
volumes.add(Mockito.mock(FSVolume.class)); volumes.add(Mockito.mock(FSVolumeInterface.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance( RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance(
@ -69,14 +69,14 @@ public void testRR() throws Exception {
@Test @Test
public void testRRPolicyExceptionMessage() public void testRRPolicyExceptionMessage()
throws Exception { throws Exception {
final List<FSVolume> volumes = new ArrayList<FSVolume>(); final List<FSVolumeInterface> volumes = new ArrayList<FSVolumeInterface>();
// First volume, with 500 bytes of space. // First volume, with 500 bytes of space.
volumes.add(Mockito.mock(FSVolume.class)); volumes.add(Mockito.mock(FSVolumeInterface.class));
Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L); Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L);
// Second volume, with 600 bytes of space. // Second volume, with 600 bytes of space.
volumes.add(Mockito.mock(FSVolume.class)); volumes.add(Mockito.mock(FSVolumeInterface.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy(); RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy();

View File

@ -140,7 +140,7 @@ private ExtendedBlock[] setup(String bpid, FSDataset dataSet) throws IOException
ReplicasMap replicasMap = dataSet.volumeMap; ReplicasMap replicasMap = dataSet.volumeMap;
FSVolume vol = dataSet.volumes.getNextVolume(0); FSVolume vol = dataSet.volumes.getNextVolume(0);
ReplicaInfo replicaInfo = new FinalizedReplica( ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED].getLocalBlock(), vol, vol.getDir()); blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
replicasMap.add(bpid, replicaInfo); replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile(); replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile(); replicaInfo.getMetaFile().createNewFile();
@ -160,15 +160,15 @@ private ExtendedBlock[] setup(String bpid, FSDataset dataSet) throws IOException
blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid, blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
blocks[RWR].getLocalBlock()).getParentFile())); blocks[RWR].getLocalBlock()).getParentFile()));
replicasMap.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR] replicasMap.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
.getLocalBlock(), vol, vol.getDir()), 2007)); .getLocalBlock(), vol, vol.getCurrentDir().getParentFile()), 2007));
return blocks; return blocks;
} }
private void testAppend(String bpid, FSDataset dataSet, ExtendedBlock[] blocks) throws IOException { private void testAppend(String bpid, FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1; long newGS = blocks[FINALIZED].getGenerationStamp()+1;
FSVolume v = dataSet.volumeMap.get(bpid, blocks[FINALIZED].getLocalBlock()) final FSVolume v = (FSVolume)dataSet.volumeMap.get(
.getVolume(); bpid, blocks[FINALIZED].getLocalBlock()).getVolume();
long available = v.getCapacity()-v.getDfsUsed(); long available = v.getCapacity()-v.getDfsUsed();
long expectedLen = blocks[FINALIZED].getNumBytes(); long expectedLen = blocks[FINALIZED].getNumBytes();
try { try {