HDFS-10637. Modifications to remove the assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via lei)

This commit is contained in:
Lei Xu 2016-10-10 15:28:19 -07:00
parent 0773ffd0f8
commit 96b12662ea
35 changed files with 1062 additions and 713 deletions

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.ToolRunner;
@ -269,11 +270,17 @@ public abstract class Storage extends StorageInfo {
private String storageUuid = null; // Storage directory identifier.
private final StorageLocation location;
public StorageDirectory(File dir) {
// default dirType is null
this(dir, null, false);
}
public StorageDirectory(StorageLocation location) {
// default dirType is null
this(location.getFile(), null, false, location);
}
public StorageDirectory(File dir, StorageDirType dirType) {
this(dir, dirType, false);
}
@ -294,11 +301,22 @@ public abstract class Storage extends StorageInfo {
* disables locking on the storage directory, false enables locking
*/
public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
this(dir, dirType, isShared, null);
}
public StorageDirectory(File dir, StorageDirType dirType,
boolean isShared, StorageLocation location) {
this.root = dir;
this.lock = null;
this.dirType = dirType;
this.isShared = isShared;
this.location = location;
assert location == null ||
dir.getAbsolutePath().startsWith(
location.getFile().getAbsolutePath()):
"The storage location and directory should be equal";
}
/**
* Get root directory of this storage
@ -861,6 +879,10 @@ public abstract class Storage extends StorageInfo {
}
return false;
}
public StorageLocation getStorageLocation() {
return location;
}
}
/**

View File

@ -147,10 +147,10 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException
*/
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
File dataDir, StartupOption startOpt,
File dataDir, StorageLocation location, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
StorageDirectory sd = new StorageDirectory(dataDir, null, true, location);
try {
StorageState curState = sd.analyzeStorage(startOpt, this, true);
// sd is locked but not opened
@ -208,9 +208,9 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
Collection<File> dataDirs, StorageLocation location,
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList();
try {
for (File dataDir : dataDirs) {
@ -220,7 +220,7 @@ public class BlockPoolSliceStorage extends Storage {
"attempt to load an used block storage: " + dataDir);
}
final StorageDirectory sd = loadStorageDirectory(
nsInfo, dataDir, startOpt, callables, conf);
nsInfo, dataDir, location, startOpt, callables, conf);
succeedDirs.add(sd);
}
} catch (IOException e) {
@ -244,12 +244,12 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
Collection<File> dataDirs, StorageLocation location,
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
final List<StorageDirectory> loaded = loadBpStorageDirectories(
nsInfo, dataDirs, startOpt, callables, conf);
nsInfo, dataDirs, location, startOpt, callables, conf);
for (StorageDirectory sd : loaded) {
addStorageDir(sd);
}

View File

@ -201,17 +201,17 @@ public class BlockScanner {
FsVolumeSpi volume = ref.getVolume();
if (!isEnabled()) {
LOG.debug("Not adding volume scanner for {}, because the block " +
"scanner is disabled.", volume.getBasePath());
"scanner is disabled.", volume);
return;
}
VolumeScanner scanner = scanners.get(volume.getStorageID());
if (scanner != null) {
LOG.error("Already have a scanner for volume {}.",
volume.getBasePath());
volume);
return;
}
LOG.debug("Adding scanner for volume {} (StorageID {})",
volume.getBasePath(), volume.getStorageID());
volume, volume.getStorageID());
scanner = new VolumeScanner(conf, datanode, ref);
scanner.start();
scanners.put(volume.getStorageID(), scanner);
@ -245,7 +245,7 @@ public class BlockScanner {
return;
}
LOG.info("Removing scanner for volume {} (StorageID {})",
volume.getBasePath(), volume.getStorageID());
volume, volume.getStorageID());
scanner.shutdown();
scanners.remove(volume.getStorageID());
Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);

View File

@ -58,7 +58,6 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -78,7 +77,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -791,11 +789,7 @@ public class DataNode extends ReconfigurableBase
if (locations.isEmpty()) {
return;
}
Set<File> volumesToRemove = new HashSet<>();
for (StorageLocation loc : locations) {
volumesToRemove.add(loc.getFile().getAbsoluteFile());
}
removeVolumes(volumesToRemove, true);
removeVolumes(locations, true);
}
/**
@ -814,26 +808,22 @@ public class DataNode extends ReconfigurableBase
* @throws IOException
*/
private synchronized void removeVolumes(
final Set<File> absoluteVolumePaths, boolean clearFailure)
final Collection<StorageLocation> storageLocations, boolean clearFailure)
throws IOException {
for (File vol : absoluteVolumePaths) {
Preconditions.checkArgument(vol.isAbsolute());
}
if (absoluteVolumePaths.isEmpty()) {
if (storageLocations.isEmpty()) {
return;
}
LOG.info(String.format("Deactivating volumes (clear failure=%b): %s",
clearFailure, Joiner.on(",").join(absoluteVolumePaths)));
clearFailure, Joiner.on(",").join(storageLocations)));
IOException ioe = null;
// Remove volumes and block infos from FsDataset.
data.removeVolumes(absoluteVolumePaths, clearFailure);
data.removeVolumes(storageLocations, clearFailure);
// Remove volumes from DataStorage.
try {
storage.removeVolumes(absoluteVolumePaths);
storage.removeVolumes(storageLocations);
} catch (IOException e) {
ioe = e;
}
@ -841,7 +831,7 @@ public class DataNode extends ReconfigurableBase
// Set configuration and dataDirs to reflect volume changes.
for (Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext(); ) {
StorageLocation loc = it.next();
if (absoluteVolumePaths.contains(loc.getFile().getAbsoluteFile())) {
if (storageLocations.contains(loc)) {
it.remove();
}
}
@ -3242,18 +3232,18 @@ public class DataNode extends ReconfigurableBase
* Check the disk error
*/
private void checkDiskError() {
Set<File> unhealthyDataDirs = data.checkDataDir();
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
Set<StorageLocation> unhealthyLocations = data.checkDataDir();
if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) {
try {
// Remove all unhealthy volumes from DataNode.
removeVolumes(unhealthyDataDirs, false);
removeVolumes(unhealthyLocations, false);
} catch (IOException e) {
LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ e.getMessage(), e);
}
StringBuilder sb = new StringBuilder("DataNode failed volumes:");
for (File dataDir : unhealthyDataDirs) {
sb.append(dataDir.getAbsolutePath() + ";");
for (StorageLocation location : unhealthyLocations) {
sb.append(location + ";");
}
handleDiskError(sb.toString());
}

View File

@ -263,9 +263,10 @@ public class DataStorage extends Storage {
}
private StorageDirectory loadStorageDirectory(DataNode datanode,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
List<Callable<StorageDirectory>> callables) throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
NamespaceInfo nsInfo, File dataDir, StorageLocation location,
StartupOption startOpt, List<Callable<StorageDirectory>> callables)
throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false, location);
try {
StorageState curState = sd.analyzeStorage(startOpt, this, true);
// sd is locked but not opened
@ -310,7 +311,7 @@ public class DataStorage extends Storage {
* builder later.
*
* @param datanode DataNode object.
* @param volume the root path of a storage directory.
* @param location the StorageLocation for the storage directory.
* @param nsInfos an array of namespace infos.
* @return a VolumeBuilder that holds the metadata of this storage directory
* and can be added to DataStorage later.
@ -318,8 +319,10 @@ public class DataStorage extends Storage {
*
* Note that if there is IOException, the state of DataStorage is not modified.
*/
public VolumeBuilder prepareVolume(DataNode datanode, File volume,
List<NamespaceInfo> nsInfos) throws IOException {
public VolumeBuilder prepareVolume(DataNode datanode,
StorageLocation location, List<NamespaceInfo> nsInfos)
throws IOException {
File volume = location.getFile();
if (containsStorageDir(volume)) {
final String errorMessage = "Storage directory is in use";
LOG.warn(errorMessage + ".");
@ -327,7 +330,8 @@ public class DataStorage extends Storage {
}
StorageDirectory sd = loadStorageDirectory(
datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
datanode, nsInfos.get(0), volume, location,
StartupOption.HOTSWAP, null);
VolumeBuilder builder =
new VolumeBuilder(this, sd);
for (NamespaceInfo nsInfo : nsInfos) {
@ -338,7 +342,8 @@ public class DataStorage extends Storage {
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
nsInfo, bpDataDirs, location, StartupOption.HOTSWAP,
null, datanode.getConf());
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
}
return builder;
@ -407,7 +412,7 @@ public class DataStorage extends Storage {
final List<Callable<StorageDirectory>> callables
= Lists.newArrayList();
final StorageDirectory sd = loadStorageDirectory(
datanode, nsInfo, root, startOpt, callables);
datanode, nsInfo, root, dataDir, startOpt, callables);
if (callables.isEmpty()) {
addStorageDir(sd);
success.add(dataDir);
@ -458,7 +463,8 @@ public class DataStorage extends Storage {
final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
nsInfo, bpDataDirs, dataDir, startOpt,
callables, datanode.getConf());
if (callables.isEmpty()) {
for(StorageDirectory sd : dirs) {
success.add(sd);
@ -498,9 +504,10 @@ public class DataStorage extends Storage {
* @param dirsToRemove a set of storage directories to be removed.
* @throws IOException if I/O error when unlocking storage directory.
*/
synchronized void removeVolumes(final Set<File> dirsToRemove)
synchronized void removeVolumes(
final Collection<StorageLocation> storageLocations)
throws IOException {
if (dirsToRemove.isEmpty()) {
if (storageLocations.isEmpty()) {
return;
}
@ -508,7 +515,8 @@ public class DataStorage extends Storage {
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
if (dirsToRemove.contains(sd.getRoot())) {
StorageLocation sdLocation = sd.getStorageLocation();
if (storageLocations.contains(sdLocation)) {
// Remove the block pool level storage first.
for (Map.Entry<String, BlockPoolSliceStorage> entry :
this.bpStorageMap.entrySet()) {

View File

@ -22,7 +22,6 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -37,9 +36,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -47,10 +43,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
@ -209,200 +204,6 @@ public class DirectoryScanner implements Runnable {
}
}
/**
* Tracks the files and other information related to a block on the disk
* Missing file is indicated by setting the corresponding member
* to null.
*
* Because millions of these structures may be created, we try to save
* memory here. So instead of storing full paths, we store path suffixes.
* The block file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>
* So we don't need to store the volume path, since we already know what the
* volume is.
*
* The metadata file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>_<genstamp>.meta
* So if we have a block file, there isn't any need to store the block path
* again.
*
* The accessor functions take care of these manipulations.
*/
static class ScanInfo implements Comparable<ScanInfo> {
private final long blockId;
/**
* The block file path, relative to the volume's base directory.
* If there was no block file found, this may be null. If 'vol'
* is null, then this is the full path of the block file.
*/
private final String blockSuffix;
/**
* The suffix of the meta file path relative to the block file.
* If blockSuffix is null, then this will be the entire path relative
* to the volume base directory, or an absolute path if vol is also
* null.
*/
private final String metaSuffix;
private final FsVolumeSpi volume;
/**
* Get the file's length in async block scan
*/
private final long blockFileLength;
private final static Pattern CONDENSED_PATH_REGEX =
Pattern.compile("(?<!^)(\\\\|/){2,}");
private final static String QUOTED_FILE_SEPARATOR =
Matcher.quoteReplacement(File.separator);
/**
* Get the most condensed version of the path.
*
* For example, the condensed version of /foo//bar is /foo/bar
* Unlike {@link File#getCanonicalPath()}, this will never perform I/O
* on the filesystem.
*
* @param path the path to condense
* @return the condensed path
*/
private static String getCondensedPath(String path) {
return CONDENSED_PATH_REGEX.matcher(path).
replaceAll(QUOTED_FILE_SEPARATOR);
}
/**
* Get a path suffix.
*
* @param f The file to get the suffix for.
* @param prefix The prefix we're stripping off.
*
* @return A suffix such that prefix + suffix = path to f
*/
private static String getSuffix(File f, String prefix) {
String fullPath = getCondensedPath(f.getAbsolutePath());
if (fullPath.startsWith(prefix)) {
return fullPath.substring(prefix.length());
}
throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
*/
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
this.blockId = blockId;
String condensedVolPath = vol == null ? null :
getCondensedPath(vol.getBasePath());
this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath);
this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
if (metaFile == null) {
this.metaSuffix = null;
} else if (blockFile == null) {
this.metaSuffix = getSuffix(metaFile, condensedVolPath);
} else {
this.metaSuffix = getSuffix(metaFile,
condensedVolPath + blockSuffix);
}
this.volume = vol;
}
/**
* Returns the block data file.
*
* @return the block data file
*/
File getBlockFile() {
return (blockSuffix == null) ? null :
new File(volume.getBasePath(), blockSuffix);
}
/**
* Return the length of the data block. The length returned is the length
* cached when this object was created.
*
* @return the length of the data block
*/
long getBlockFileLength() {
return blockFileLength;
}
/**
* Returns the block meta data file or null if there isn't one.
*
* @return the block meta data file
*/
File getMetaFile() {
if (metaSuffix == null) {
return null;
} else if (blockSuffix == null) {
return new File(volume.getBasePath(), metaSuffix);
} else {
return new File(volume.getBasePath(), blockSuffix + metaSuffix);
}
}
/**
* Returns the block ID.
*
* @return the block ID
*/
long getBlockId() {
return blockId;
}
/**
* Returns the volume that contains the block that this object describes.
*
* @return the volume
*/
FsVolumeSpi getVolume() {
return volume;
}
@Override // Comparable
public int compareTo(ScanInfo b) {
if (blockId < b.blockId) {
return -1;
} else if (blockId == b.blockId) {
return 0;
} else {
return 1;
}
}
@Override // Object
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ScanInfo)) {
return false;
}
return blockId == ((ScanInfo) o).blockId;
}
@Override // Object
public int hashCode() {
return (int)(blockId^(blockId>>>32));
}
public long getGenStamp() {
return metaSuffix != null ? Block.getGenerationStamp(
getMetaFile().getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
}
}
/**
* Create a new directory scanner, but don't cycle it running yet.
@ -644,7 +445,7 @@ public class DirectoryScanner implements Runnable {
// There may be multiple on-disk records for the same block, don't increment
// the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
if (nextInfo.getBlockId() != info.blockId) {
if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
} else {
@ -762,19 +563,6 @@ public class DirectoryScanner implements Runnable {
return list.toSortedArrays();
}
/**
* Helper method to determine if a file name is consistent with a block.
* meta-data file
*
* @param blockId the block ID
* @param metaFile the file to check
* @return whether the file name is a block meta-data file name
*/
private static boolean isBlockMetaFile(String blockId, String metaFile) {
return metaFile.startsWith(blockId)
&& metaFile.endsWith(Block.METADATA_EXTENSION);
}
/**
* The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the
@ -784,7 +572,7 @@ public class DirectoryScanner implements Runnable {
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
public class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
private final FsVolumeSpi volume;
private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
@ -816,14 +604,12 @@ public class DirectoryScanner implements Runnable {
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<>();
File bpFinalizedDir = volume.getFinalizedDir(bpid);
perfTimer.start();
throttleTimer.start();
try {
result.put(bpid,
compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
result.put(bpid, volume.compileReport(bpid, report, this));
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
@ -833,107 +619,13 @@ public class DirectoryScanner implements Runnable {
return result;
}
/**
* Compile a list of {@link ScanInfo} for the blocks in the directory
* given by {@code dir}.
*
* @param vol the volume that contains the directory to scan
* @param bpFinalizedDir the root directory of the directory to scan
* @param dir the directory to scan
* @param report the list onto which blocks reports are placed
*/
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
throws InterruptedException {
throttle();
List <String> fileNames;
try {
fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure.
datanode.checkDiskErrorAsync();
// Ignore this directory and proceed.
return report;
}
Collections.sort(fileNames);
/*
* Assumption: In the sorted list of files block file appears immediately
* before block metadata file. This is true for the current naming
* convention for block file blk_<blockid> and meta file
* blk_<blockid>_<genstamp>.meta
*/
for (int i = 0; i < fileNames.size(); i++) {
// Make sure this thread can make a timely exit. With a low throttle
// rate, completing a run can take a looooong time.
if (Thread.interrupted()) {
throw new InterruptedException();
}
File file = new File(dir, fileNames.get(i));
if (file.isDirectory()) {
compileReport(vol, bpFinalizedDir, file, report);
continue;
}
if (!Block.isBlockFilename(file)) {
if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
long blockId = Block.getBlockId(file.getName());
verifyFileLocation(file.getParentFile(), bpFinalizedDir,
blockId);
report.add(new ScanInfo(blockId, null, file, vol));
}
continue;
}
File blockFile = file;
long blockId = Block.filename2id(file.getName());
File metaFile = null;
// Skip all the files that start with block name until
// getting to the metafile for the block
while (i + 1 < fileNames.size()) {
File blkMetaFile = new File(dir, fileNames.get(i + 1));
if (!(blkMetaFile.isFile()
&& blkMetaFile.getName().startsWith(blockFile.getName()))) {
break;
}
i++;
if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
metaFile = blkMetaFile;
break;
}
}
verifyFileLocation(blockFile, bpFinalizedDir, blockId);
report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
}
return report;
}
/**
* Verify whether the actual directory location of block file has the
* expected directory path computed using its block ID.
*/
private void verifyFileLocation(File actualBlockFile,
File bpFinalizedDir, long blockId) {
File expectedBlockDir =
DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
File actualBlockDir = actualBlockFile.getParentFile();
if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
LOG.warn("Block: " + blockId +
" found in invalid directory. Expected directory: " +
expectedBlockDir + ". Actual directory: " + actualBlockDir);
}
}
/**
* Called by the thread before each potential disk scan so that a pause
* can be optionally inserted to limit the number of scans per second.
* The limit is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
private void throttle() throws InterruptedException {
public void throttle() throws InterruptedException {
accumulateTimeRunning();
if ((throttleLimitMsPerSec < 1000) &&
@ -963,7 +655,7 @@ public class DirectoryScanner implements Runnable {
}
}
private enum BlockDirFilter implements FilenameFilter {
public enum BlockDirFilter implements FilenameFilter {
INSTANCE;
@Override

View File

@ -500,7 +500,8 @@ public class DiskBalancer {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
storageIDToVolBasePathMap.put(vol.getStorageID(), vol.getBasePath());
storageIDToVolBasePathMap.put(vol.getStorageID(),
vol.getBaseURI().getPath());
}
references.close();
}
@ -1023,7 +1024,7 @@ public class DiskBalancer {
openPoolIters(source, poolIters);
if (poolIters.size() == 0) {
LOG.error("No block pools found on volume. volume : {}. Exiting.",
source.getBasePath());
source.getBaseURI());
return;
}
@ -1033,17 +1034,16 @@ public class DiskBalancer {
// Check for the max error count constraint.
if (item.getErrorCount() > getMaxError(item)) {
LOG.error("Exceeded the max error count. source {}, dest: {} " +
"error count: {}", source.getBasePath(),
dest.getBasePath(), item.getErrorCount());
this.setExitFlag();
continue;
"error count: {}", source.getBaseURI(),
dest.getBaseURI(), item.getErrorCount());
break;
}
// Check for the block tolerance constraint.
if (isCloseEnough(item)) {
LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
"blocks.",
source.getBasePath(), dest.getBasePath(),
source.getBaseURI(), dest.getBaseURI(),
item.getBytesCopied(), item.getBlocksCopied());
this.setExitFlag();
continue;
@ -1053,7 +1053,7 @@ public class DiskBalancer {
// we are not able to find any blocks to copy.
if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " +
"Dest:{}", source.getBasePath(), dest.getBasePath());
"Dest:{}", source.getBaseURI(), dest.getBaseURI());
this.setExitFlag();
continue;
}
@ -1081,14 +1081,13 @@ public class DiskBalancer {
// exiting here.
LOG.error("Destination volume: {} does not have enough space to" +
" accommodate a block. Block Size: {} Exiting from" +
" copyBlocks.", dest.getBasePath(), block.getNumBytes());
this.setExitFlag();
continue;
" copyBlocks.", dest.getBaseURI(), block.getNumBytes());
break;
}
LOG.debug("Moved block with size {} from {} to {}",
block.getNumBytes(), source.getBasePath(),
dest.getBasePath());
block.getNumBytes(), source.getBaseURI(),
dest.getBaseURI());
// Check for the max throughput constraint.
// We sleep here to keep the promise that we will not

View File

@ -39,8 +39,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils;

View File

@ -25,8 +25,8 @@ import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.util.LightWeightResizableGSet;

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.StringUtils;
/**
* Encapsulates the URI and storage medium that together describe a
* storage directory.
@ -37,7 +38,7 @@ import org.apache.hadoop.util.StringUtils;
*
*/
@InterfaceAudience.Private
public class StorageLocation {
public class StorageLocation implements Comparable<StorageLocation>{
final StorageType storageType;
final File file;
@ -104,16 +105,37 @@ public class StorageLocation {
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj == null || !(obj instanceof StorageLocation)) {
if (obj == null || !(obj instanceof StorageLocation)) {
return false;
}
return toString().equals(obj.toString());
int comp = compareTo((StorageLocation) obj);
return comp == 0;
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public int compareTo(StorageLocation obj) {
if (obj == this) {
return 0;
} else if (obj == null) {
return -1;
}
StorageLocation otherStorage = (StorageLocation) obj;
if (this.getFile() != null && otherStorage.getFile() != null) {
return this.getFile().getAbsolutePath().compareTo(
otherStorage.getFile().getAbsolutePath());
} else if (this.getFile() == null && otherStorage.getFile() == null) {
return this.storageType.compareTo(otherStorage.getStorageType());
} else if (this.getFile() == null) {
return -1;
} else {
return 1;
}
}
}

View File

@ -217,7 +217,7 @@ public class VolumeScanner extends Thread {
public void printStats(StringBuilder p) {
p.append(String.format("Block scanner information for volume %s with base" +
" path %s%n", volume.getStorageID(), volume.getBasePath()));
" path %s%n", volume.getStorageID(), volume));
synchronized (stats) {
p.append(String.format("Bytes verified in last hour : %57d%n",
stats.bytesScannedInPastHour));
@ -253,20 +253,20 @@ public class VolumeScanner extends Thread {
public void setup(VolumeScanner scanner) {
LOG.trace("Starting VolumeScanner {}",
scanner.volume.getBasePath());
scanner.volume);
this.scanner = scanner;
}
public void handle(ExtendedBlock block, IOException e) {
FsVolumeSpi volume = scanner.volume;
if (e == null) {
LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
LOG.trace("Successfully scanned {} on {}", block, volume);
return;
}
// If the block does not exist anymore, then it's not an error.
if (!volume.getDataset().contains(block)) {
LOG.debug("Volume {}: block {} is no longer in the dataset.",
volume.getBasePath(), block);
volume, block);
return;
}
// If the block exists, the exception may due to a race with write:
@ -278,11 +278,10 @@ public class VolumeScanner extends Thread {
if (e instanceof FileNotFoundException ) {
LOG.info("Volume {}: verification failed for {} because of " +
"FileNotFoundException. This may be due to a race with write.",
volume.getBasePath(), block);
volume, block);
return;
}
LOG.warn("Reporting bad " + block + " with volume "
+ volume.getBasePath(), e);
LOG.warn("Reporting bad {} on {}", block, volume);
try {
scanner.datanode.reportBadBlocks(block, volume);
} catch (IOException ie) {
@ -305,7 +304,7 @@ public class VolumeScanner extends Thread {
handler = new ScanResultHandler();
}
this.resultHandler = handler;
setName("VolumeScannerThread(" + volume.getBasePath() + ")");
setName("VolumeScannerThread(" + volume + ")");
setDaemon(true);
}
@ -376,7 +375,7 @@ public class VolumeScanner extends Thread {
BlockIterator iter = blockIters.get(idx);
if (!iter.atEnd()) {
LOG.info("Now scanning bpid {} on volume {}",
iter.getBlockPoolId(), volume.getBasePath());
iter.getBlockPoolId(), volume);
curBlockIter = iter;
return 0L;
}
@ -385,7 +384,7 @@ public class VolumeScanner extends Thread {
if (waitMs <= 0) {
iter.rewind();
LOG.info("Now rescanning bpid {} on volume {}, after more than " +
"{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(),
"{} hour(s)", iter.getBlockPoolId(), volume,
TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS));
curBlockIter = iter;
return 0L;
@ -416,16 +415,16 @@ public class VolumeScanner extends Thread {
cblock.getBlockPoolId(), cblock.getBlockId());
if (b == null) {
LOG.info("Replica {} was not found in the VolumeMap for volume {}",
cblock, volume.getBasePath());
cblock, volume);
} else {
block = new ExtendedBlock(cblock.getBlockPoolId(), b);
}
} catch (FileNotFoundException e) {
LOG.info("FileNotFoundException while finding block {} on volume {}",
cblock, volume.getBasePath());
cblock, volume);
} catch (IOException e) {
LOG.warn("I/O error while finding block {} on volume {}",
cblock, volume.getBasePath());
cblock, volume);
}
if (block == null) {
return -1; // block not found.
@ -642,7 +641,7 @@ public class VolumeScanner extends Thread {
@Override
public String toString() {
return "VolumeScanner(" + volume.getBasePath() +
return "VolumeScanner(" + volume +
", " + volume.getStorageID() + ")";
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -206,7 +207,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @param clearFailure set true to clear the failure information about the
* volumes.
*/
void removeVolumes(Set<File> volumes, boolean clearFailure);
void removeVolumes(Collection<StorageLocation> volumes, boolean clearFailure);
/** @return a storage with the given storage ID */
DatanodeStorage getStorage(final String storageUuid);
@ -482,7 +483,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Check if all the data directories are healthy
* @return A set of unhealthy data directories.
*/
Set<File> checkDataDir();
Set<StorageLocation> checkDataDir();
/**
* Shutdown the FSDataset

View File

@ -20,10 +20,20 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
/**
* This is an interface for the underlying volume.
@ -48,14 +58,14 @@ public interface FsVolumeSpi {
long getAvailable() throws IOException;
/** @return the base path to the volume */
String getBasePath();
URI getBaseURI();
/** @return the path to the volume */
String getPath(String bpid) throws IOException;
DF getUsageStats(Configuration conf);
/** @return the directory for the finalized blocks in the block pool. */
File getFinalizedDir(String bpid) throws IOException;
/** @return the {@link StorageLocation} to the volume */
StorageLocation getStorageLocation();
/** @return the {@link StorageType} of the volume */
StorageType getStorageType();
/** Returns true if the volume is NOT backed by persistent storage. */
@ -186,4 +196,216 @@ public interface FsVolumeSpi {
* Get the FSDatasetSpi which this volume is a part of.
*/
FsDatasetSpi getDataset();
/**
* Tracks the files and other information related to a block on the disk
* Missing file is indicated by setting the corresponding member
* to null.
*
* Because millions of these structures may be created, we try to save
* memory here. So instead of storing full paths, we store path suffixes.
* The block file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>
* So we don't need to store the volume path, since we already know what the
* volume is.
*
* The metadata file, if it exists, will have a path like this:
* <volume_base_path>/<block_path>_<genstamp>.meta
* So if we have a block file, there isn't any need to store the block path
* again.
*
* The accessor functions take care of these manipulations.
*/
public static class ScanInfo implements Comparable<ScanInfo> {
private final long blockId;
/**
* The block file path, relative to the volume's base directory.
* If there was no block file found, this may be null. If 'vol'
* is null, then this is the full path of the block file.
*/
private final String blockSuffix;
/**
* The suffix of the meta file path relative to the block file.
* If blockSuffix is null, then this will be the entire path relative
* to the volume base directory, or an absolute path if vol is also
* null.
*/
private final String metaSuffix;
private final FsVolumeSpi volume;
/**
* Get the file's length in async block scan
*/
private final long blockFileLength;
private final static Pattern CONDENSED_PATH_REGEX =
Pattern.compile("(?<!^)(\\\\|/){2,}");
private final static String QUOTED_FILE_SEPARATOR =
Matcher.quoteReplacement(File.separator);
/**
* Get the most condensed version of the path.
*
* For example, the condensed version of /foo//bar is /foo/bar
* Unlike {@link File#getCanonicalPath()}, this will never perform I/O
* on the filesystem.
*
* @param path the path to condense
* @return the condensed path
*/
private static String getCondensedPath(String path) {
return CONDENSED_PATH_REGEX.matcher(path).
replaceAll(QUOTED_FILE_SEPARATOR);
}
/**
* Get a path suffix.
*
* @param f The file to get the suffix for.
* @param prefix The prefix we're stripping off.
*
* @return A suffix such that prefix + suffix = path to f
*/
private static String getSuffix(File f, String prefix) {
String fullPath = getCondensedPath(f.getAbsolutePath());
if (fullPath.startsWith(prefix)) {
return fullPath.substring(prefix.length());
}
throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
*/
public ScanInfo(long blockId, File blockFile, File metaFile,
FsVolumeSpi vol) {
this.blockId = blockId;
String condensedVolPath =
(vol == null || vol.getBaseURI() == null) ? null :
getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath);
this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
if (metaFile == null) {
this.metaSuffix = null;
} else if (blockFile == null) {
this.metaSuffix = getSuffix(metaFile, condensedVolPath);
} else {
this.metaSuffix = getSuffix(metaFile,
condensedVolPath + blockSuffix);
}
this.volume = vol;
}
/**
* Returns the block data file.
*
* @return the block data file
*/
public File getBlockFile() {
return (blockSuffix == null) ? null :
new File(new File(volume.getBaseURI()).getAbsolutePath(), blockSuffix);
}
/**
* Return the length of the data block. The length returned is the length
* cached when this object was created.
*
* @return the length of the data block
*/
public long getBlockFileLength() {
return blockFileLength;
}
/**
* Returns the block meta data file or null if there isn't one.
*
* @return the block meta data file
*/
public File getMetaFile() {
if (metaSuffix == null) {
return null;
} else if (blockSuffix == null) {
return new File(new File(volume.getBaseURI()).getAbsolutePath(),
metaSuffix);
} else {
return new File(new File(volume.getBaseURI()).getAbsolutePath(),
blockSuffix + metaSuffix);
}
}
/**
* Returns the block ID.
*
* @return the block ID
*/
public long getBlockId() {
return blockId;
}
/**
* Returns the volume that contains the block that this object describes.
*
* @return the volume
*/
public FsVolumeSpi getVolume() {
return volume;
}
@Override // Comparable
public int compareTo(ScanInfo b) {
if (blockId < b.blockId) {
return -1;
} else if (blockId == b.blockId) {
return 0;
} else {
return 1;
}
}
@Override // Object
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ScanInfo)) {
return false;
}
return blockId == ((ScanInfo) o).blockId;
}
@Override // Object
public int hashCode() {
return (int)(blockId^(blockId>>>32));
}
public long getGenStamp() {
return metaSuffix != null ? Block.getGenerationStamp(
getMetaFile().getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
}
}
/**
* Compile a list of {@link ScanInfo} for the blocks in
* the block pool with id {@code bpid}.
*
* @param bpid block pool id to scan
* @param report the list onto which blocks reports are placed
* @param reportCompiler
* @throws IOException
*/
LinkedList<ScanInfo> compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException;
}

View File

@ -71,8 +71,8 @@ class FsDatasetAsyncDiskService {
private final DataNode datanode;
private final FsDatasetImpl fsdatasetImpl;
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
private Map<String, ThreadPoolExecutor> executors
= new HashMap<String, ThreadPoolExecutor>();
private Map<String, Set<Long>> deletedBlockIds
= new HashMap<String, Set<Long>>();
private static final int MAX_DELETED_BLOCKS = 64;
@ -91,7 +91,7 @@ class FsDatasetAsyncDiskService {
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
private void addExecutorForVolume(final File volume) {
private void addExecutorForVolume(final FsVolumeImpl volume) {
ThreadFactory threadFactory = new ThreadFactory() {
int counter = 0;
@ -115,18 +115,21 @@ class FsDatasetAsyncDiskService {
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(volume, executor);
executors.put(volume.getStorageID(), executor);
}
/**
* Starts AsyncDiskService for a new volume
* @param volume the root of the new data volume.
*/
synchronized void addVolume(File volume) {
synchronized void addVolume(FsVolumeImpl volume) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (volume == null) {
throw new RuntimeException("Attempt to add a null volume");
}
ThreadPoolExecutor executor = executors.get(volume.getStorageID());
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
@ -137,17 +140,17 @@ class FsDatasetAsyncDiskService {
* Stops AsyncDiskService for a volume.
* @param volume the root of the volume.
*/
synchronized void removeVolume(File volume) {
synchronized void removeVolume(String storageId) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
throw new RuntimeException("Can not find volume " + volume
+ " to remove.");
throw new RuntimeException("Can not find volume with storageId "
+ storageId + " to remove.");
} else {
executor.shutdown();
executors.remove(volume);
executors.remove(storageId);
}
}
@ -162,13 +165,16 @@ class FsDatasetAsyncDiskService {
/**
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(File root, Runnable task) {
synchronized void execute(FsVolumeImpl volume, Runnable task) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(root);
if (volume == null) {
throw new RuntimeException("A null volume does not have a executor");
}
ThreadPoolExecutor executor = executors.get(volume.getStorageID());
if (executor == null) {
throw new RuntimeException("Cannot find root " + root
throw new RuntimeException("Cannot find volume " + volume
+ " for execution of task " + task);
} else {
executor.execute(task);
@ -185,7 +191,7 @@ class FsDatasetAsyncDiskService {
} else {
LOG.info("Shutting down all async disk service threads");
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
for (Map.Entry<String, ThreadPoolExecutor> e : executors.entrySet()) {
e.getValue().shutdown();
}
// clear the executor map so that calling execute again will fail.
@ -198,7 +204,7 @@ class FsDatasetAsyncDiskService {
public void submitSyncFileRangeRequest(FsVolumeImpl volume,
final FileDescriptor fd, final long offset, final long nbytes,
final int flags) {
execute(volume.getCurrentDir(), new Runnable() {
execute(volume, new Runnable() {
@Override
public void run() {
try {
@ -220,7 +226,7 @@ class FsDatasetAsyncDiskService {
+ " replica " + replicaToDelete + " for deletion");
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
volumeRef, replicaToDelete, block, trashDirectory);
execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask);
}
/** A task for deleting a block file and its associated meta file, as well

View File

@ -361,20 +361,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
Set<StorageLocation> failedLocationSet = Sets.newHashSetWithExpectedSize(
dataLocations.size());
for (StorageLocation sl: dataLocations) {
failedLocationSet.add(sl.getFile().getAbsolutePath());
LOG.info("Adding to failedLocationSet " + sl);
failedLocationSet.add(sl);
}
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd = it.next();
failedLocationSet.remove(sd.getRoot().getAbsolutePath());
failedLocationSet.remove(sd.getStorageLocation());
LOG.info("Removing from failedLocationSet " + sd.getStorageLocation());
}
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
failedLocationSet.size());
long failureDate = Time.now();
for (String failedStorageLocation: failedLocationSet) {
for (StorageLocation failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
failureDate));
}
@ -403,49 +405,55 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume());
volumes.addVolume(ref);
}
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
final StorageLocation storageLocation = sd.getStorageLocation();
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
FsVolumeImpl fsVolume = new FsVolumeImplBuilder()
.setDataset(this)
.setStorageID(sd.getStorageUuid())
.setStorageDirectory(sd)
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageType, ref);
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
LOG.info("Added volume - " + storageLocation + ", StorageType: " +
storageLocation.getStorageType());
}
@VisibleForTesting
public FsVolumeImpl createFsVolume(String storageUuid, File currentDir,
StorageType storageType) throws IOException {
return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType);
public FsVolumeImpl createFsVolume(String storageUuid,
Storage.StorageDirectory sd,
final StorageLocation location) throws IOException {
return new FsVolumeImplBuilder()
.setDataset(this)
.setStorageID(storageUuid)
.setStorageDirectory(sd)
.setConf(conf)
.build();
}
@Override
public void addVolume(final StorageLocation location,
final List<NamespaceInfo> nsInfos)
throws IOException {
final File dir = location.getFile();
// Prepare volume in DataStorage
final DataStorage.VolumeBuilder builder;
try {
builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
builder = dataStorage.prepareVolume(datanode, location, nsInfos);
} catch (IOException e) {
volumes.addVolumeFailureInfo(new VolumeFailureInfo(
location.getFile().getAbsolutePath(), Time.now()));
volumes.addVolumeFailureInfo(new VolumeFailureInfo(location, Time.now()));
throw e;
}
@ -453,7 +461,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
createFsVolume(sd.getStorageUuid(), sd, location);
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
ArrayList<IOException> exceptions = Lists.newArrayList();
@ -482,34 +490,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
builder.build();
activateVolume(tempVolumeMap, sd, storageType, ref);
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
LOG.info("Added volume - " + location + ", StorageType: " + storageType);
}
/**
* Removes a set of volumes from FsDataset.
* @param volumesToRemove a set of absolute root path of each volume.
* @param storageLocationsToRemove a set of
* {@link StorageLocation}s for each volume.
* @param clearFailure set true to clear failure information.
*/
@Override
public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
// Make sure that all volumes are absolute path.
for (File vol : volumesToRemove) {
Preconditions.checkArgument(vol.isAbsolute(),
String.format("%s is not absolute path.", vol.getPath()));
}
public void removeVolumes(
Collection<StorageLocation> storageLocationsToRemove,
boolean clearFailure) {
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
try (AutoCloseableLock lock = datasetLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final File absRoot = sd.getRoot().getAbsoluteFile();
if (volumesToRemove.contains(absRoot)) {
LOG.info("Removing " + absRoot + " from FsDataset.");
final StorageLocation sdLocation = sd.getStorageLocation();
LOG.info("Checking removing StorageLocation " +
sdLocation + " with id " + sd.getStorageUuid());
if (storageLocationsToRemove.contains(sdLocation)) {
LOG.info("Removing StorageLocation " + sdLocation + " with id " +
sd.getStorageUuid() + " from FsDataset.");
// Disable the volume from the service.
asyncDiskService.removeVolume(sd.getCurrentDir());
volumes.removeVolume(absRoot, clearFailure);
asyncDiskService.removeVolume(sd.getStorageUuid());
volumes.removeVolume(sdLocation, clearFailure);
volumes.waitVolumeRemoved(5000, datasetLockCondition);
// Removed all replica information for the blocks on the volume.
@ -517,12 +524,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// not scan disks.
for (String bpid : volumeMap.getBlockPoolList()) {
List<ReplicaInfo> blocks = new ArrayList<>();
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
it.hasNext(); ) {
for (Iterator<ReplicaInfo> it =
volumeMap.replicas(bpid).iterator(); it.hasNext();) {
ReplicaInfo block = it.next();
final File absBasePath =
new File(block.getVolume().getBasePath()).getAbsoluteFile();
if (absBasePath.equals(absRoot)) {
final StorageLocation blockStorageLocation =
block.getVolume().getStorageLocation();
LOG.info("checking for block " + block.getBlockId() +
" with storageLocation " + blockStorageLocation);
if (blockStorageLocation.equals(sdLocation)) {
blocks.add(block);
it.remove();
}
@ -625,7 +634,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
infos.length);
for (VolumeFailureInfo info: infos) {
failedStorageLocations.add(info.getFailedStorageLocation());
failedStorageLocations.add(
info.getFailedStorageLocation().getFile().getAbsolutePath());
}
return failedStorageLocations.toArray(
new String[failedStorageLocations.size()]);
@ -663,7 +673,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long lastVolumeFailureDate = 0;
long estimatedCapacityLostTotal = 0;
for (VolumeFailureInfo info: infos) {
failedStorageLocations.add(info.getFailedStorageLocation());
failedStorageLocations.add(
info.getFailedStorageLocation().getFile().getAbsolutePath());
long failureDate = info.getFailureDate();
if (failureDate > lastVolumeFailureDate) {
lastVolumeFailureDate = failureDate;
@ -960,25 +971,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Copy files to temp dir first
File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), replicaInfo,
targetVolume.getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block,
replicaInfo, smallBufferSize, conf);
ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
.setBlockId(replicaInfo.getBlockId())
.setGenerationStamp(replicaInfo.getGenerationStamp())
.setFsVolume(targetVolume)
.setDirectoryToUse(blockFiles[0].getParentFile())
.setBytesToReserve(0)
.build();
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
try (AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
volume.incrNumBlocks(block.getBlockPoolId());
}
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
@ -2072,7 +2073,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @return the failed volumes. Returns null if no volume failed.
*/
@Override // FsDatasetSpi
public Set<File> checkDataDir() {
public Set<StorageLocation> checkDataDir() {
return volumes.checkDirs();
}
@ -2250,9 +2251,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setFsVolume(vol)
.setDirectoryToUse(diskFile.getParentFile())
.build();
((FsVolumeImpl) vol).getBlockPoolSlice(bpid)
.resolveDuplicateReplicas(
memBlockInfo, diskBlockInfo, volumeMap);
((FsVolumeImpl) vol).resolveDuplicateReplicas(bpid,
memBlockInfo, diskBlockInfo, volumeMap);
}
} else {
if (!diskFile.delete()) {
@ -2803,15 +2803,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
asyncLazyPersistService != null &&
!asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.addVolume(v.getCurrentDir());
!asyncLazyPersistService.queryVolume(v)) {
asyncLazyPersistService.addVolume(v);
}
// Remove thread for DISK volume if RamDisk is not configured
if (!ramDiskConfigured &&
asyncLazyPersistService != null &&
asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.removeVolume(v.getCurrentDir());
asyncLazyPersistService.queryVolume(v)) {
asyncLazyPersistService.removeVolume(v);
}
}
@ -2946,11 +2946,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Move the replica from lazyPersist/ to finalized/ on
// the target volume
BlockPoolSlice bpSlice =
replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
newReplicaInfo =
bpSlice.activateSavedReplica(replicaInfo, replicaState);
replicaState.getLazyPersistVolume().activateSavedReplica(bpid,
replicaInfo, replicaState);
// Update the volumeMap entry.
volumeMap.add(bpid, newReplicaInfo);

View File

@ -23,11 +23,13 @@ import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -56,13 +58,18 @@ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.BlockDirFilter;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.CloseableReferenceCount;
@ -102,8 +109,14 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final StorageType storageType;
private final Map<String, BlockPoolSlice> bpSlices
= new ConcurrentHashMap<String, BlockPoolSlice>();
// Refers to the base StorageLocation used to construct this volume
// (i.e., does not include STORAGE_DIR_CURRENT in
// <location>/STORAGE_DIR_CURRENT/)
private final StorageLocation storageLocation;
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final DF usage;
private final long reserved;
private CloseableReferenceCount reference = new CloseableReferenceCount();
@ -124,19 +137,25 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
protected ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf, StorageType storageType) throws IOException {
FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd,
Configuration conf) throws IOException {
if (sd.getStorageLocation() == null) {
throw new IOException("StorageLocation specified for storage directory " +
sd + " is null");
}
this.dataset = dataset;
this.storageID = storageID;
this.reservedForReplicas = new AtomicLong(0L);
this.storageLocation = sd.getStorageLocation();
this.currentDir = sd.getCurrentDir();
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
this.storageType = storageLocation.getStorageType();
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY
+ "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
this.reservedForReplicas = new AtomicLong(0L);
this.currentDir = currentDir;
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
this.storageType = storageType;
this.configuredCapacity = -1;
this.conf = conf;
cacheExecutor = initializeCacheExecutor(parent);
@ -285,19 +304,20 @@ public class FsVolumeImpl implements FsVolumeSpi {
return true;
}
@VisibleForTesting
File getCurrentDir() {
return currentDir;
}
File getRbwDir(String bpid) throws IOException {
protected File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir();
}
File getLazyPersistDir(String bpid) throws IOException {
protected File getLazyPersistDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getLazypersistDir();
}
File getTmpDir(String bpid) throws IOException {
protected File getTmpDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getTmpDir();
}
@ -448,6 +468,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
return reserved;
}
@VisibleForTesting
BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp == null) {
@ -457,21 +478,33 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
@Override
public String getBasePath() {
return currentDir.getParent();
public URI getBaseURI() {
return new File(currentDir.getParent()).toURI();
}
@Override
public DF getUsageStats(Configuration conf) {
if (currentDir != null) {
try {
return new DF(new File(currentDir.getParent()), conf);
} catch (IOException e) {
LOG.error("Unable to get disk statistics for volume " + this);
}
}
return null;
}
@Override
public StorageLocation getStorageLocation() {
return storageLocation;
}
@Override
public boolean isTransientStorage() {
return storageType.isTransient();
}
@Override
public String getPath(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
}
@Override
@VisibleForTesting
public File getFinalizedDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getFinalizedDir();
}
@ -951,7 +984,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override
public String toString() {
return currentDir.getAbsolutePath();
return currentDir != null ? currentDir.getParent() : "NULL";
}
void shutdown() {
@ -1189,5 +1222,167 @@ public class FsVolumeImpl implements FsVolumeSpi {
dstBlockFile, true, DFSUtilClient.getSmallBufferSize(conf), conf);
}
@Override
public LinkedList<ScanInfo> compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException {
return compileReport(getFinalizedDir(bpid),
getFinalizedDir(bpid), report, reportCompiler);
}
private LinkedList<ScanInfo> compileReport(File bpFinalizedDir,
File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException {
reportCompiler.throttle();
List <String> fileNames;
try {
fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure.
dataset.datanode.checkDiskErrorAsync();
// Ignore this directory and proceed.
return report;
}
Collections.sort(fileNames);
/*
* Assumption: In the sorted list of files block file appears immediately
* before block metadata file. This is true for the current naming
* convention for block file blk_<blockid> and meta file
* blk_<blockid>_<genstamp>.meta
*/
for (int i = 0; i < fileNames.size(); i++) {
// Make sure this thread can make a timely exit. With a low throttle
// rate, completing a run can take a looooong time.
if (Thread.interrupted()) {
throw new InterruptedException();
}
File file = new File(dir, fileNames.get(i));
if (file.isDirectory()) {
compileReport(bpFinalizedDir, file, report, reportCompiler);
continue;
}
if (!Block.isBlockFilename(file)) {
if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
long blockId = Block.getBlockId(file.getName());
verifyFileLocation(file.getParentFile(), bpFinalizedDir,
blockId);
report.add(new ScanInfo(blockId, null, file, this));
}
continue;
}
File blockFile = file;
long blockId = Block.filename2id(file.getName());
File metaFile = null;
// Skip all the files that start with block name until
// getting to the metafile for the block
while (i + 1 < fileNames.size()) {
File blkMetaFile = new File(dir, fileNames.get(i + 1));
if (!(blkMetaFile.isFile()
&& blkMetaFile.getName().startsWith(blockFile.getName()))) {
break;
}
i++;
if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
metaFile = blkMetaFile;
break;
}
}
verifyFileLocation(blockFile, bpFinalizedDir, blockId);
report.add(new ScanInfo(blockId, blockFile, metaFile, this));
}
return report;
}
/**
* Helper method to determine if a file name is consistent with a block.
* meta-data file
*
* @param blockId the block ID
* @param metaFile the file to check
* @return whether the file name is a block meta-data file name
*/
private static boolean isBlockMetaFile(String blockId, String metaFile) {
return metaFile.startsWith(blockId)
&& metaFile.endsWith(Block.METADATA_EXTENSION);
}
/**
* Verify whether the actual directory location of block file has the
* expected directory path computed using its block ID.
*/
private void verifyFileLocation(File actualBlockFile,
File bpFinalizedDir, long blockId) {
File expectedBlockDir =
DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
File actualBlockDir = actualBlockFile.getParentFile();
if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
LOG.warn("Block: " + blockId +
" found in invalid directory. Expected directory: " +
expectedBlockDir + ". Actual directory: " + actualBlockDir);
}
}
public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
ReplicaInfo replicaInfo,
int smallBufferSize,
Configuration conf) throws IOException {
File[] blockFiles = FsDatasetImpl.copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), replicaInfo,
getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
.setBlockId(replicaInfo.getBlockId())
.setGenerationStamp(replicaInfo.getGenerationStamp())
.setFsVolume(this)
.setDirectoryToUse(blockFiles[0].getParentFile())
.setBytesToReserve(0)
.build();
newReplicaInfo.setNumBytes(blockFiles[1].length());
return newReplicaInfo;
}
public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
long genStamp,
ReplicaInfo replicaInfo,
int smallBufferSize,
Configuration conf) throws IOException {
File lazyPersistDir = getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or " +
"create lazy persist dir: " + lazyPersistDir.toString());
}
// No FsDatasetImpl lock for the file copy
File[] targetFiles = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, replicaInfo, lazyPersistDir, true,
smallBufferSize, conf);
return targetFiles;
}
public void incrNumBlocks(String bpid) throws IOException {
getBlockPoolSlice(bpid).incrNumBlocks();
}
public void resolveDuplicateReplicas(String bpid, ReplicaInfo memBlockInfo,
ReplicaInfo diskBlockInfo, ReplicaMap volumeMap) throws IOException {
getBlockPoolSlice(bpid).resolveDuplicateReplicas(
memBlockInfo, diskBlockInfo, volumeMap);
}
public ReplicaInfo activateSavedReplica(String bpid,
ReplicaInfo replicaInfo, RamDiskReplica replicaState) throws IOException {
return getBlockPoolSlice(bpid).activateSavedReplica(replicaInfo,
replicaState);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
/**
* This class is to be used as a builder for {@link FsVolumeImpl} objects.
*/
public class FsVolumeImplBuilder {
private FsDatasetImpl dataset;
private String storageID;
private StorageDirectory sd;
private Configuration conf;
public FsVolumeImplBuilder() {
dataset = null;
storageID = null;
sd = null;
conf = null;
}
FsVolumeImplBuilder setDataset(FsDatasetImpl dataset) {
this.dataset = dataset;
return this;
}
FsVolumeImplBuilder setStorageID(String id) {
this.storageID = id;
return this;
}
FsVolumeImplBuilder setStorageDirectory(StorageDirectory sd) {
this.sd = sd;
return this;
}
FsVolumeImplBuilder setConf(Configuration conf) {
this.conf = conf;
return this;
}
FsVolumeImpl build() throws IOException {
return new FsVolumeImpl(dataset, storageID, sd, conf);
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
@ -51,8 +51,10 @@ class FsVolumeList {
private final CopyOnWriteArrayList<FsVolumeImpl> volumes =
new CopyOnWriteArrayList<>();
// Tracks volume failures, sorted by volume path.
private final Map<String, VolumeFailureInfo> volumeFailureInfos =
Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
// map from volume storageID to the volume failure info
private final Map<StorageLocation, VolumeFailureInfo> volumeFailureInfos =
Collections.synchronizedMap(
new TreeMap<StorageLocation, VolumeFailureInfo>());
private final ConcurrentLinkedQueue<FsVolumeImpl> volumesBeingRemoved =
new ConcurrentLinkedQueue<>();
private final AutoCloseableLock checkDirsLock;
@ -234,10 +236,9 @@ class FsVolumeList {
*
* @return list of all the failed volumes.
*/
Set<File> checkDirs() {
Set<StorageLocation> checkDirs() {
try (AutoCloseableLock lock = checkDirsLock.acquire()) {
Set<File> failedVols = null;
Set<StorageLocation> failedLocations = null;
// Make a copy of volumes for performing modification
final List<FsVolumeImpl> volumeList = getVolumes();
@ -247,10 +248,10 @@ class FsVolumeList {
fsv.checkDirs();
} catch (DiskErrorException e) {
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
if (failedVols == null) {
failedVols = new HashSet<>(1);
if (failedLocations == null) {
failedLocations = new HashSet<>(1);
}
failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
failedLocations.add(fsv.getStorageLocation());
addVolumeFailureInfo(fsv);
removeVolume(fsv);
} catch (ClosedChannelException e) {
@ -261,13 +262,13 @@ class FsVolumeList {
}
}
if (failedVols != null && failedVols.size() > 0) {
FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
+ " failure volumes.");
if (failedLocations != null && failedLocations.size() > 0) {
FsDatasetImpl.LOG.warn("Completed checkDirs. Found " +
failedLocations.size() + " failure volumes.");
}
waitVolumeRemoved(5000, checkDirsLockCondition);
return failedVols;
return failedLocations;
}
}
@ -315,7 +316,7 @@ class FsVolumeList {
}
// If the volume is used to replace a failed volume, it needs to reset the
// volume failure info for this volume.
removeVolumeFailureInfo(new File(volume.getBasePath()));
removeVolumeFailureInfo(volume.getStorageLocation());
FsDatasetImpl.LOG.info("Added new volume: " +
volume.getStorageID());
}
@ -351,16 +352,15 @@ class FsVolumeList {
* @param volume the volume to be removed.
* @param clearFailure set true to remove failure info for this volume.
*/
void removeVolume(File volume, boolean clearFailure) {
void removeVolume(StorageLocation storageLocation, boolean clearFailure) {
for (FsVolumeImpl fsVolume : volumes) {
String basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
String targetPath = volume.getAbsolutePath();
if (basePath.equals(targetPath)) {
StorageLocation baseLocation = fsVolume.getStorageLocation();
if (baseLocation.equals(storageLocation)) {
removeVolume(fsVolume);
}
}
if (clearFailure) {
removeVolumeFailureInfo(volume);
removeVolumeFailureInfo(storageLocation);
}
}
@ -394,13 +394,13 @@ class FsVolumeList {
private void addVolumeFailureInfo(FsVolumeImpl vol) {
addVolumeFailureInfo(new VolumeFailureInfo(
new File(vol.getBasePath()).getAbsolutePath(),
vol.getStorageLocation(),
Time.now(),
vol.getCapacity()));
}
private void removeVolumeFailureInfo(File vol) {
volumeFailureInfos.remove(vol.getAbsolutePath());
private void removeVolumeFailureInfo(StorageLocation location) {
volumeFailureInfos.remove(location);
}
void addBlockPool(final String bpid, final Configuration conf) throws IOException {

View File

@ -58,8 +58,8 @@ class RamDiskAsyncLazyPersistService {
private final Configuration conf;
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
private Map<String, ThreadPoolExecutor> executors
= new HashMap<String, ThreadPoolExecutor>();
private final static HdfsConfiguration EMPTY_HDFS_CONF = new HdfsConfiguration();
/**
@ -75,13 +75,14 @@ class RamDiskAsyncLazyPersistService {
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
private void addExecutorForVolume(final File volume) {
private void addExecutorForVolume(final String storageId) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(threadGroup, r);
t.setName("Async RamDisk lazy persist worker for volume " + volume);
t.setName("Async RamDisk lazy persist worker " +
" for volume with id " + storageId);
return t;
}
};
@ -93,39 +94,41 @@ class RamDiskAsyncLazyPersistService {
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(volume, executor);
executors.put(storageId, executor);
}
/**
* Starts AsyncLazyPersistService for a new volume
* @param volume the root of the new data volume.
*/
synchronized void addVolume(File volume) {
synchronized void addVolume(FsVolumeImpl volume) {
String storageId = volume.getStorageID();
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
ThreadPoolExecutor executor = executors.get(storageId);
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
addExecutorForVolume(volume);
addExecutorForVolume(storageId);
}
/**
* Stops AsyncLazyPersistService for a volume.
* @param volume the root of the volume.
*/
synchronized void removeVolume(File volume) {
synchronized void removeVolume(FsVolumeImpl volume) {
String storageId = volume.getStorageID();
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
throw new RuntimeException("Can not find volume " + volume
+ " to remove.");
throw new RuntimeException("Can not find volume with storage id " +
storageId + " to remove.");
} else {
executor.shutdown();
executors.remove(volume);
executors.remove(storageId);
}
}
@ -135,25 +138,28 @@ class RamDiskAsyncLazyPersistService {
* @return true if there is one thread pool for the volume
* false otherwise
*/
synchronized boolean queryVolume(File volume) {
synchronized boolean queryVolume(FsVolumeImpl volume) {
String storageId = volume.getStorageID();
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
throw new RuntimeException(
"AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
ThreadPoolExecutor executor = executors.get(storageId);
return (executor != null);
}
/**
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(File root, Runnable task) {
synchronized void execute(String storageId, Runnable task) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
throw new RuntimeException(
"AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(root);
ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
throw new RuntimeException("Cannot find root " + root
+ " for execution of task " + task);
throw new RuntimeException("Cannot find root storage volume with id " +
storageId + " for execution of task " + task);
} else {
executor.execute(task);
}
@ -169,7 +175,7 @@ class RamDiskAsyncLazyPersistService {
} else {
LOG.info("Shutting down all async lazy persist service threads");
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
for (Map.Entry<String, ThreadPoolExecutor> e : executors.entrySet()) {
e.getValue().shutdown();
}
// clear the executor map so that calling execute again will fail.
@ -189,18 +195,11 @@ class RamDiskAsyncLazyPersistService {
+ bpId + " block id: " + blockId);
}
FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
File lazyPersistDir = volume.getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+ lazyPersistDir.toString());
}
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
bpId, blockId, genStamp, creationTime, replica,
target, lazyPersistDir);
execute(volume.getCurrentDir(), lazyPersistTask);
bpId, blockId, genStamp, creationTime, replica, target);
FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
execute(volume.getStorageID(), lazyPersistTask);
}
class ReplicaLazyPersistTask implements Runnable {
@ -210,19 +209,17 @@ class RamDiskAsyncLazyPersistService {
private final long creationTime;
private final ReplicaInfo replicaInfo;
private final FsVolumeReference targetVolume;
private final File lazyPersistDir;
ReplicaLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
ReplicaInfo replicaInfo,
FsVolumeReference targetVolume, File lazyPersistDir) {
FsVolumeReference targetVolume) {
this.bpId = bpId;
this.blockId = blockId;
this.genStamp = genStamp;
this.creationTime = creationTime;
this.replicaInfo = replicaInfo;
this.targetVolume = targetVolume;
this.lazyPersistDir = lazyPersistDir;
}
@Override
@ -241,14 +238,14 @@ class RamDiskAsyncLazyPersistService {
final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
try (FsVolumeReference ref = this.targetVolume) {
int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF);
// No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, replicaInfo, lazyPersistDir, true,
smallBufferSize, conf);
FsVolumeImpl volume = (FsVolumeImpl)ref.getVolume();
File[] targetFiles = volume.copyBlockToLazyPersistLocation(bpId,
blockId, genStamp, replicaInfo, smallBufferSize, conf);
// Lock FsDataSetImpl during onCompleteLazyPersist callback
dataset.onCompleteLazyPersist(bpId, blockId,
creationTime, targetFiles, (FsVolumeImpl)ref.getVolume());
creationTime, targetFiles, volume);
succeeded = true;
} catch (Exception e){
FsDatasetImpl.LOG.warn(

View File

@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
/**
* Tracks information about failure of a data volume.
*/
final class VolumeFailureInfo {
private final String failedStorageLocation;
private final StorageLocation failedStorageLocation;
private final long failureDate;
private final long estimatedCapacityLost;
@ -33,7 +35,8 @@ final class VolumeFailureInfo {
* @param failedStorageLocation storage location that has failed
* @param failureDate date/time of failure in milliseconds since epoch
*/
public VolumeFailureInfo(String failedStorageLocation, long failureDate) {
public VolumeFailureInfo(StorageLocation failedStorageLocation,
long failureDate) {
this(failedStorageLocation, failureDate, 0);
}
@ -44,8 +47,8 @@ final class VolumeFailureInfo {
* @param failureDate date/time of failure in milliseconds since epoch
* @param estimatedCapacityLost estimate of capacity lost in bytes
*/
public VolumeFailureInfo(String failedStorageLocation, long failureDate,
long estimatedCapacityLost) {
public VolumeFailureInfo(StorageLocation failedStorageLocation,
long failureDate, long estimatedCapacityLost) {
this.failedStorageLocation = failedStorageLocation;
this.failureDate = failureDate;
this.estimatedCapacityLost = estimatedCapacityLost;
@ -56,7 +59,7 @@ final class VolumeFailureInfo {
*
* @return storage location that has failed
*/
public String getFailedStorageLocation() {
public StorageLocation getFailedStorageLocation() {
return this.failedStorageLocation;
}

View File

@ -5413,7 +5413,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
if (volumeFailureSummary != null) {
innerinfo
.put("failedStorageLocations",
.put("failedStorageIDs",
volumeFailureSummary.getFailedStorageLocations())
.put("lastVolumeFailureDate",
volumeFailureSummary.getLastVolumeFailureDate())

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -216,13 +217,13 @@ public class TestNameNodePrunesMissingStorages {
datanodeToRemoveStorageFromIdx++;
}
// Find the volume within the datanode which holds that first storage.
String volumeDirectoryToRemove = null;
StorageLocation volumeLocationToRemove = null;
try (FsVolumeReferences volumes =
datanodeToRemoveStorageFrom.getFSDataset().getFsVolumeReferences()) {
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
volumeDirectoryToRemove = volume.getBasePath();
volumeLocationToRemove = volume.getStorageLocation();
}
}
};
@ -230,10 +231,11 @@ public class TestNameNodePrunesMissingStorages {
// Replace the volume directory with a regular file, which will
// cause a volume failure. (If we merely removed the directory,
// it would be re-initialized with a new storage ID.)
assertNotNull(volumeDirectoryToRemove);
assertNotNull(volumeLocationToRemove);
datanodeToRemoveStorageFrom.shutdown();
FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
FileUtil.fullyDelete(volumeLocationToRemove.getFile());
FileOutputStream fos = new FileOutputStream(
volumeLocationToRemove.getFile().toString());
try {
fos.write(1);
} finally {
@ -326,7 +328,8 @@ public class TestNameNodePrunesMissingStorages {
dn.getFSDataset().getFsVolumeReferences();
final String newStorageId = DatanodeStorage.generateUuid();
try {
File currentDir = new File(volumeRefs.get(0).getBasePath(), "current");
File currentDir = new File(
volumeRefs.get(0).getStorageLocation().getFile(), "current");
File versionFile = new File(currentDir, "VERSION");
rewriteVersionFile(versionFile, newStorageId);
} finally {

View File

@ -22,7 +22,9 @@ import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@ -38,6 +40,7 @@ import javax.management.StandardMBean;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -494,21 +498,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return storage.getCapacity() - storage.getUsed();
}
@Override
public String getBasePath() {
return null;
}
@Override
public String getPath(String bpid) throws IOException {
return null;
}
@Override
public File getFinalizedDir(String bpid) throws IOException {
return null;
}
@Override
public StorageType getStorageType() {
return null;
@ -546,6 +535,28 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
}
@Override
public StorageLocation getStorageLocation() {
return null;
}
@Override
public URI getBaseURI() {
return null;
}
@Override
public DF getUsageStats(Configuration conf) {
return null;
}
@Override
public LinkedList<ScanInfo> compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException {
return null;
}
}
private final Map<String, Map<Block, BInfo>> blockMap
@ -1030,7 +1041,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
public Set<File> checkDataDir() {
public Set<StorageLocation> checkDataDir() {
// nothing to check for simulated data set
return null;
}
@ -1344,7 +1355,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
public synchronized void removeVolumes(Set<File> volumes, boolean clearFailure) {
public synchronized void removeVolumes(Collection<StorageLocation> volumes,
boolean clearFailure) {
throw new UnsupportedOperationException();
}

View File

@ -549,7 +549,8 @@ public class TestBlockScanner {
info.shouldRun = false;
}
ctx.datanode.shutdown();
String vPath = ctx.volumes.get(0).getBasePath();
String vPath = ctx.volumes.get(0).getStorageLocation()
.getFile().getAbsolutePath();
File cursorPath = new File(new File(new File(vPath, "current"),
ctx.bpids[0]), "scanner.cursor");
assertTrue("Failed to find cursor save file in " +

View File

@ -52,7 +52,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -519,11 +518,8 @@ public class TestDataNodeHotSwapVolumes {
ExtendedBlock block =
DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock();
FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block);
String basePath = volumeWithBlock.getBasePath();
File storageDir = new File(basePath);
URI fileUri = storageDir.toURI();
String dirWithBlock =
"[" + volumeWithBlock.getStorageType() + "]" + fileUri;
String dirWithBlock = "[" + volumeWithBlock.getStorageType() + "]" +
volumeWithBlock.getStorageLocation().getFile().toURI();
String newDirs = dirWithBlock;
for (String dir : oldDirs) {
if (dirWithBlock.startsWith(dir)) {
@ -581,8 +577,8 @@ public class TestDataNodeHotSwapVolumes {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (FsVolumeSpi volume : volumes) {
assertThat(volume.getBasePath(), is(not(anyOf(
is(newDirs.get(0)), is(newDirs.get(2))))));
assertThat(volume.getStorageLocation().getFile().toString(),
is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
}
}
DataStorage storage = dn.getStorage();
@ -765,7 +761,7 @@ public class TestDataNodeHotSwapVolumes {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
if (vol.getBasePath().equals(basePath.getPath())) {
if (vol.getBaseURI().equals(basePath.toURI())) {
return (FsVolumeImpl) vol;
}
}
@ -810,6 +806,7 @@ public class TestDataNodeHotSwapVolumes {
assertEquals(used, failedVolume.getDfsUsed());
DataNodeTestUtils.restoreDataDirFromFailure(dirToFail);
LOG.info("reconfiguring DN ");
assertThat(
"DN did not update its own config",
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir),

View File

@ -21,7 +21,6 @@ import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -254,17 +253,18 @@ public class TestDataNodeVolumeFailure {
FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
try (FsDatasetSpi.FsVolumeReferences vols = data.getFsVolumeReferences()) {
for (FsVolumeSpi volume : vols) {
assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
dn0Vol1.getAbsoluteFile());
assertFalse(volume.getStorageLocation().getFile()
.getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath()
));
}
}
// 3. all blocks on dn0Vol1 have been removed.
for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) {
assertNotNull(replica.getVolume());
assertNotEquals(
new File(replica.getVolume().getBasePath()).getAbsoluteFile(),
dn0Vol1.getAbsoluteFile());
assertFalse(replica.getVolume().getStorageLocation().getFile()
.getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath()
));
}
// 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore.

View File

@ -539,6 +539,16 @@ public class TestDataNodeVolumeFailureReporting {
assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
getMetrics(dn.getMetrics().name()));
FsDatasetSpi<?> fsd = dn.getFSDataset();
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("expectedFailedVolumes is ");
for (String expected: expectedFailedVolumes) {
strBuilder.append(expected + ",");
}
strBuilder.append(" fsd.getFailedStorageLocations() is ");
for (String expected: fsd.getFailedStorageLocations()) {
strBuilder.append(expected + ",");
}
LOG.info(strBuilder.toString());
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());
if (expectedFailedVolumes.length > 0) {

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
@ -44,6 +45,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
@ -56,11 +58,13 @@ import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -185,18 +189,20 @@ public class TestDirectoryScanner {
// Volume without a copy of the block. Make a copy now.
File sourceBlock = new File(b.getBlockURI());
File sourceMeta = new File(b.getMetadataURI());
String sourceRoot = b.getVolume().getBasePath();
String destRoot = v.getBasePath();
URI sourceRoot = b.getVolume().getStorageLocation().getFile().toURI();
URI destRoot = v.getStorageLocation().getFile().toURI();
String relativeBlockPath =
new File(sourceRoot).toURI().relativize(sourceBlock.toURI())
sourceRoot.relativize(sourceBlock.toURI())
.getPath();
String relativeMetaPath =
new File(sourceRoot).toURI().relativize(sourceMeta.toURI())
sourceRoot.relativize(sourceMeta.toURI())
.getPath();
File destBlock = new File(destRoot, relativeBlockPath);
File destMeta = new File(destRoot, relativeMetaPath);
File destBlock = new File(new File(destRoot).toString(),
relativeBlockPath);
File destMeta = new File(new File(destRoot).toString(),
relativeMetaPath);
destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock);
@ -238,7 +244,8 @@ public class TestDirectoryScanner {
try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
int numVolumes = volumes.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File finalizedDir = ((FsVolumeImpl) volumes.get(index))
.getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
@ -253,8 +260,8 @@ public class TestDirectoryScanner {
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir = refs.get(index).getFinalizedDir(bpid);
File finalizedDir = ((FsVolumeImpl) refs.get(index))
.getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
@ -271,7 +278,8 @@ public class TestDirectoryScanner {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir = refs.get(index).getFinalizedDir(bpid);
File finalizedDir =
((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
@ -311,7 +319,7 @@ public class TestDirectoryScanner {
scanner.reconcile();
assertTrue(scanner.diffs.containsKey(bpid));
LinkedList<DirectoryScanner.ScanInfo> diff = scanner.diffs.get(bpid);
LinkedList<FsVolumeSpi.ScanInfo> diff = scanner.diffs.get(bpid);
assertTrue(scanner.stats.containsKey(bpid));
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
@ -820,17 +828,6 @@ public class TestDirectoryScanner {
return 0;
}
@Override
public String getBasePath() {
return (new File("/base")).getAbsolutePath();
}
@Override
public String getPath(String bpid) throws IOException {
return (new File("/base/current/" + bpid)).getAbsolutePath();
}
@Override
public File getFinalizedDir(String bpid) throws IOException {
return new File("/base/current/" + bpid + "/finalized");
}
@ -877,6 +874,29 @@ public class TestDirectoryScanner {
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
}
@Override
public StorageLocation getStorageLocation() {
return null;
}
@Override
public URI getBaseURI() {
return (new File("/base")).toURI();
}
@Override
public DF getUsageStats(Configuration conf) {
return null;
}
@Override
public LinkedList<ScanInfo> compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException {
return null;
}
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
@ -887,8 +907,8 @@ public class TestDirectoryScanner {
void testScanInfoObject(long blockId, File blockFile, File metaFile)
throws Exception {
DirectoryScanner.ScanInfo scanInfo =
new DirectoryScanner.ScanInfo(blockId, blockFile, metaFile, TEST_VOLUME);
FsVolumeSpi.ScanInfo scanInfo =
new FsVolumeSpi.ScanInfo(blockId, blockFile, metaFile, TEST_VOLUME);
assertEquals(blockId, scanInfo.getBlockId());
if (blockFile != null) {
assertEquals(blockFile.getAbsolutePath(),
@ -906,8 +926,8 @@ public class TestDirectoryScanner {
}
void testScanInfoObject(long blockId) throws Exception {
DirectoryScanner.ScanInfo scanInfo =
new DirectoryScanner.ScanInfo(blockId, null, null, null);
FsVolumeSpi.ScanInfo scanInfo =
new FsVolumeSpi.ScanInfo(blockId, null, null, null);
assertEquals(blockId, scanInfo.getBlockId());
assertNull(scanInfo.getBlockFile());
assertNull(scanInfo.getMetaFile());
@ -963,8 +983,8 @@ public class TestDirectoryScanner {
List<FsVolumeSpi> volumes = new ArrayList<>();
Iterator<FsVolumeSpi> iterator = fds.getFsVolumeReferences().iterator();
while (iterator.hasNext()) {
FsVolumeSpi volume = iterator.next();
FsVolumeSpi spy = Mockito.spy(volume);
FsVolumeImpl volume = (FsVolumeImpl) iterator.next();
FsVolumeImpl spy = Mockito.spy(volume);
Mockito.doThrow(new IOException("Error while getFinalizedDir"))
.when(spy).getFinalizedDir(volume.getBlockPoolList()[0]);
volumes.add(spy);

View File

@ -199,7 +199,7 @@ public class TestDiskError {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
String dir = vol.getBasePath();
String dir = vol.getStorageLocation().getFile().getAbsolutePath();
Path dataDir = new Path(dir);
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
assertEquals("Permission for dir: " + dataDir + ", is " + actual +

View File

@ -56,12 +56,14 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
public void addVolume(StorageLocation location, List<NamespaceInfo> nsInfos) throws IOException {
public void addVolume(StorageLocation location, List<NamespaceInfo> nsInfos)
throws IOException {
}
@Override
public void removeVolumes(Set<File> volumes, boolean clearFailure) {
public void removeVolumes(Collection<StorageLocation> volumes,
boolean clearFailure) {
throw new UnsupportedOperationException();
}
@Override
@ -242,7 +244,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
public Set<File> checkDataDir() {
public Set<StorageLocation> checkDataDir() {
return null;
}

View File

@ -18,11 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.extdataset;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -43,21 +48,6 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
return 0;
}
@Override
public String getBasePath() {
return null;
}
@Override
public String getPath(String bpid) throws IOException {
return null;
}
@Override
public File getFinalizedDir(String bpid) throws IOException {
return null;
}
@Override
public String getStorageID() {
return null;
@ -100,4 +90,26 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
public FsDatasetSpi getDataset() {
return null;
}
@Override
public StorageLocation getStorageLocation() {
return null;
}
@Override
public URI getBaseURI() {
return null;
}
@Override
public DF getUsageStats(Configuration conf) {
return null;
}
@Override
public LinkedList<ScanInfo> compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException {
return null;
}
}

View File

@ -374,9 +374,12 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
public long getRawCapacity() throws IOException {
try (FsVolumeReferences volRefs = dataset.getFsVolumeReferences()) {
Preconditions.checkState(volRefs.size() != 0);
DF df = new DF(new File(volRefs.get(0).getBasePath()),
dataset.datanode.getConf());
return df.getCapacity();
DF df = volRefs.get(0).getUsageStats(dataset.datanode.getConf());
if (df != null) {
return df.getCapacity();
} else {
return -1;
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
@ -50,7 +51,9 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
@ -122,8 +125,10 @@ public class TestFsDatasetImpl {
private final static String BLOCKPOOL = "BP-TEST";
private static Storage.StorageDirectory createStorageDirectory(File root) {
Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
private static Storage.StorageDirectory createStorageDirectory(File root)
throws SecurityException, IOException {
Storage.StorageDirectory sd = new Storage.StorageDirectory(
StorageLocation.parse(root.toURI().toString()));
DataStorage.createStorageID(sd, false);
return sd;
}
@ -196,16 +201,18 @@ public class TestFsDatasetImpl {
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
String pathUri = new Path(path).toUri().toString();
expectedVolumes.add(new File(pathUri).toString());
expectedVolumes.add(new File(pathUri).getAbsolutePath());
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
when(storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
LOG.info("expectedVolumes " + i + " is " +
new File(pathUri).getAbsolutePath());
}
assertEquals(totalVolumes, getNumVolumes());
@ -215,7 +222,9 @@ public class TestFsDatasetImpl {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (int i = 0; i < numNewVolumes; i++) {
actualVolumes.add(volumes.get(numExistingVolumes + i).getBasePath());
String volumeName = volumes.get(numExistingVolumes + i).toString();
actualVolumes.add(volumeName);
LOG.info("actualVolume " + i + " is " + volumeName);
}
}
assertEquals(actualVolumes.size(), expectedVolumes.size());
@ -262,9 +271,18 @@ public class TestFsDatasetImpl {
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
final String volumePathToRemove = dataDirs[0];
Set<File> volumesToRemove = new HashSet<>();
volumesToRemove.add(StorageLocation.parse(volumePathToRemove).getFile());
Set<StorageLocation> volumesToRemove = new HashSet<>();
volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
FsVolumeReferences volReferences = dataset.getFsVolumeReferences();
FsVolumeImpl volumeToRemove = null;
for (FsVolumeSpi vol: volReferences) {
if (vol.getStorageLocation().equals(volumesToRemove.iterator().next())) {
volumeToRemove = (FsVolumeImpl) vol;
}
}
assertTrue(volumeToRemove != null);
volReferences.close();
dataset.removeVolumes(volumesToRemove, true);
int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.",
@ -273,7 +291,7 @@ public class TestFsDatasetImpl {
expectedNumVolumes, dataset.storageMap.size());
try {
dataset.asyncDiskService.execute(volumesToRemove.iterator().next(),
dataset.asyncDiskService.execute(volumeToRemove,
new Runnable() {
@Override
public void run() {}
@ -281,7 +299,7 @@ public class TestFsDatasetImpl {
fail("Expect RuntimeException: the volume has been removed from the "
+ "AsyncDiskService.");
} catch (RuntimeException e) {
GenericTestUtils.assertExceptionContains("Cannot find root", e);
GenericTestUtils.assertExceptionContains("Cannot find volume", e);
}
int totalNumReplicas = 0;
@ -306,7 +324,7 @@ public class TestFsDatasetImpl {
Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
when(storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
@ -315,8 +333,8 @@ public class TestFsDatasetImpl {
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
Set<File> volumesToRemove = new HashSet<>();
volumesToRemove.add(loc.getFile());
Set<StorageLocation> volumesToRemove = new HashSet<>();
volumesToRemove.add(loc);
dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, getNumVolumes());
}
@ -336,7 +354,8 @@ public class TestFsDatasetImpl {
for (int i = 0; i < NUM_VOLUMES; i++) {
FsVolumeImpl volume = mock(FsVolumeImpl.class);
oldVolumes.add(volume);
when(volume.getBasePath()).thenReturn("data" + i);
when(volume.getStorageLocation()).thenReturn(
StorageLocation.parse(new File("data" + i).toURI().toString()));
when(volume.checkClosed()).thenReturn(true);
FsVolumeReference ref = mock(FsVolumeReference.class);
when(ref.getVolume()).thenReturn(volume);
@ -348,13 +367,16 @@ public class TestFsDatasetImpl {
final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
final FsVolumeReference newRef = mock(FsVolumeReference.class);
when(newRef.getVolume()).thenReturn(newVolume);
when(newVolume.getBasePath()).thenReturn("data4");
when(newVolume.getStorageLocation()).thenReturn(
StorageLocation.parse(new File("data4").toURI().toString()));
FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
volumeList.removeVolume(new File("data4"), false);
volumeList.removeVolume(
StorageLocation.parse((new File("data4")).toURI().toString()),
false);
volumeList.addVolume(newRef);
return null;
}
@ -386,7 +408,8 @@ public class TestFsDatasetImpl {
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(File.class), any(StorageType.class));
.createFsVolume(anyString(), any(StorageDirectory.class),
any(StorageLocation.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
@ -396,7 +419,8 @@ public class TestFsDatasetImpl {
Storage.StorageDirectory sd = createStorageDirectory(badDir);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()),
when(storage.prepareVolume(eq(datanode),
eq(StorageLocation.parse(badDir.toURI().toString())),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);
@ -540,7 +564,7 @@ public class TestFsDatasetImpl {
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(
storage.prepareVolume(eq(datanode), eq(loc.getFile()),
storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class))).thenReturn(builder);
String cacheFilePath =
@ -584,7 +608,7 @@ public class TestFsDatasetImpl {
return dfsUsed;
}
@Test(timeout = 30000)
@Test(timeout = 60000)
public void testRemoveVolumeBeingWritten() throws Exception {
// Will write and remove on dn0.
final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
@ -636,10 +660,9 @@ public class TestFsDatasetImpl {
class VolRemoveThread extends Thread {
public void run() {
Set<File> volumesToRemove = new HashSet<>();
Set<StorageLocation> volumesToRemove = new HashSet<>();
try {
volumesToRemove.add(StorageLocation.parse(
dataset.getVolume(eb).getBasePath()).getFile());
volumesToRemove.add(dataset.getVolume(eb).getStorageLocation());
} catch (Exception e) {
LOG.info("Problem preparing volumes to remove: ", e);
Assert.fail("Exception in remove volume thread, check log for " +

View File

@ -22,7 +22,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
@ -71,8 +73,13 @@ public class TestFsVolumeList {
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "nextvolume-" + i);
curDir.mkdirs();
FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
conf, StorageType.DEFAULT);
FsVolumeImpl volume = new FsVolumeImplBuilder()
.setConf(conf)
.setDataset(dataset)
.setStorageID("storage-id")
.setStorageDirectory(
new StorageDirectory(StorageLocation.parse(curDir.getPath())))
.build();
volume.setCapacityForTesting(1024 * 1024 * 1024);
volumes.add(volume);
volumeList.addVolume(volume.obtainReference());
@ -109,8 +116,13 @@ public class TestFsVolumeList {
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "volume-" + i);
curDir.mkdirs();
FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
conf, StorageType.DEFAULT);
FsVolumeImpl volume = new FsVolumeImplBuilder()
.setConf(conf)
.setDataset(dataset)
.setStorageID("storage-id")
.setStorageDirectory(
new StorageDirectory(StorageLocation.parse(curDir.getPath())))
.build();
volumes.add(volume);
volumeList.addVolume(volume.obtainReference());
}
@ -139,8 +151,13 @@ public class TestFsVolumeList {
Collections.<VolumeFailureInfo>emptyList(), null, blockChooser);
File volDir = new File(baseDir, "volume-0");
volDir.mkdirs();
FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir,
conf, StorageType.DEFAULT);
FsVolumeImpl volume = new FsVolumeImplBuilder()
.setConf(conf)
.setDataset(dataset)
.setStorageID("storage-id")
.setStorageDirectory(
new StorageDirectory(StorageLocation.parse(volDir.getPath())))
.build();
FsVolumeReference ref = volume.obtainReference();
volumeList.addVolume(ref);
assertNull(ref.getVolume());
@ -155,8 +172,13 @@ public class TestFsVolumeList {
volDir.mkdirs();
// when storage type reserved is not configured,should consider
// dfs.datanode.du.reserved.
FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf,
StorageType.RAM_DISK);
FsVolumeImpl volume = new FsVolumeImplBuilder().setDataset(dataset)
.setStorageDirectory(
new StorageDirectory(
StorageLocation.parse("[RAM_DISK]"+volDir.getPath())))
.setStorageID("storage-id")
.setConf(conf)
.build();
assertEquals("", 100L, volume.getReserved());
// when storage type reserved is configured.
conf.setLong(
@ -165,17 +187,37 @@ public class TestFsVolumeList {
conf.setLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY + "."
+ StringUtils.toLowerCase(StorageType.SSD.toString()), 2L);
FsVolumeImpl volume1 = new FsVolumeImpl(dataset, "storage-id", volDir,
conf, StorageType.RAM_DISK);
FsVolumeImpl volume1 = new FsVolumeImplBuilder().setDataset(dataset)
.setStorageDirectory(
new StorageDirectory(
StorageLocation.parse("[RAM_DISK]"+volDir.getPath())))
.setStorageID("storage-id")
.setConf(conf)
.build();
assertEquals("", 1L, volume1.getReserved());
FsVolumeImpl volume2 = new FsVolumeImpl(dataset, "storage-id", volDir,
conf, StorageType.SSD);
FsVolumeImpl volume2 = new FsVolumeImplBuilder().setDataset(dataset)
.setStorageDirectory(
new StorageDirectory(
StorageLocation.parse("[SSD]"+volDir.getPath())))
.setStorageID("storage-id")
.setConf(conf)
.build();
assertEquals("", 2L, volume2.getReserved());
FsVolumeImpl volume3 = new FsVolumeImpl(dataset, "storage-id", volDir,
conf, StorageType.DISK);
FsVolumeImpl volume3 = new FsVolumeImplBuilder().setDataset(dataset)
.setStorageDirectory(
new StorageDirectory(
StorageLocation.parse("[DISK]"+volDir.getPath())))
.setStorageID("storage-id")
.setConf(conf)
.build();
assertEquals("", 100L, volume3.getReserved());
FsVolumeImpl volume4 = new FsVolumeImpl(dataset, "storage-id", volDir,
conf, StorageType.DEFAULT);
FsVolumeImpl volume4 = new FsVolumeImplBuilder().setDataset(dataset)
.setStorageDirectory(
new StorageDirectory(
StorageLocation.parse(volDir.getPath())))
.setStorageID("storage-id")
.setConf(conf)
.build();
assertEquals("", 100L, volume4.getReserved());
}
@ -197,8 +239,13 @@ public class TestFsVolumeList {
long actualNonDfsUsage = 300L;
long reservedForReplicas = 50L;
conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, duReserved);
FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf,
StorageType.DEFAULT);
FsVolumeImpl volume = new FsVolumeImplBuilder().setDataset(dataset)
.setStorageDirectory(
new StorageDirectory(
StorageLocation.parse(volDir.getPath())))
.setStorageID("storage-id")
.setConf(conf)
.build();
FsVolumeImpl spyVolume = Mockito.spy(volume);
// Set Capacity for testing
long testCapacity = diskCapacity - duReserved;

View File

@ -331,8 +331,8 @@ public class TestDiskBalancerWithMockMover {
.getFsVolumeReferences();
nodeID = dataNode.getDatanodeUuid();
sourceName = references.get(0).getBasePath();
destName = references.get(1).getBasePath();
sourceName = references.get(0).getBaseURI().getPath();
destName = references.get(1).getBaseURI().getPath();
sourceUUID = references.get(0).getStorageID();
destUUID = references.get(1).getStorageID();
references.close();