HDFS-10638. Modifications to remove the assumption that StorageLocation is associated with java.io.File in Datanode. (Virajith Jalaparti via lei)
This commit is contained in:
parent
1f8490a5ba
commit
f209e93566
|
@ -278,7 +278,7 @@ public StorageDirectory(File dir) {
|
|||
|
||||
public StorageDirectory(StorageLocation location) {
|
||||
// default dirType is null
|
||||
this(location.getFile(), null, false, location);
|
||||
this(null, false, location);
|
||||
}
|
||||
|
||||
public StorageDirectory(File dir, StorageDirType dirType) {
|
||||
|
@ -304,20 +304,57 @@ public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
|
|||
this(dir, dirType, isShared, null);
|
||||
}
|
||||
|
||||
public StorageDirectory(File dir, StorageDirType dirType,
|
||||
/**
|
||||
* Constructor
|
||||
* @param dirType storage directory type
|
||||
* @param isShared whether or not this dir is shared between two NNs. true
|
||||
* disables locking on the storage directory, false enables locking
|
||||
* @param location the {@link StorageLocation} for this directory
|
||||
*/
|
||||
public StorageDirectory(StorageDirType dirType, boolean isShared,
|
||||
StorageLocation location) {
|
||||
this(getStorageLocationFile(location), dirType, isShared, location);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param bpid the block pool id
|
||||
* @param dirType storage directory type
|
||||
* @param isShared whether or not this dir is shared between two NNs. true
|
||||
* disables locking on the storage directory, false enables locking
|
||||
* @param location the {@link StorageLocation} for this directory
|
||||
*/
|
||||
public StorageDirectory(String bpid, StorageDirType dirType,
|
||||
boolean isShared, StorageLocation location) {
|
||||
this(new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT)), dirType,
|
||||
isShared, location);
|
||||
}
|
||||
|
||||
private StorageDirectory(File dir, StorageDirType dirType,
|
||||
boolean isShared, StorageLocation location) {
|
||||
this.root = dir;
|
||||
this.lock = null;
|
||||
this.dirType = dirType;
|
||||
this.isShared = isShared;
|
||||
this.location = location;
|
||||
assert location == null ||
|
||||
assert location == null || dir == null ||
|
||||
dir.getAbsolutePath().startsWith(
|
||||
location.getFile().getAbsolutePath()):
|
||||
new File(location.getUri()).getAbsolutePath()):
|
||||
"The storage location and directory should be equal";
|
||||
}
|
||||
|
||||
|
||||
private static File getStorageLocationFile(StorageLocation location) {
|
||||
if (location == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return new File(location.getUri());
|
||||
} catch (IllegalArgumentException e) {
|
||||
//if location does not refer to a File
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get root directory of this storage
|
||||
*/
|
||||
|
@ -932,6 +969,41 @@ protected boolean containsStorageDir(File root) throws IOException {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the storage directory on the given directory is already
|
||||
* loaded.
|
||||
* @param location the {@link StorageLocation}
|
||||
* @throws IOException if failed to get canonical path.
|
||||
*/
|
||||
protected boolean containsStorageDir(StorageLocation location)
|
||||
throws IOException {
|
||||
for (StorageDirectory sd : storageDirs) {
|
||||
if (location.matchesStorageDirectory(sd)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the storage directory on the given location is already
|
||||
* loaded.
|
||||
* @param location the {@link StorageLocation}
|
||||
* @param bpid the block pool id
|
||||
* @return true if the location matches to any existing storage directories
|
||||
* @throws IOException IOException if failed to read location
|
||||
* or storage directory path
|
||||
*/
|
||||
protected boolean containsStorageDir(StorageLocation location, String bpid)
|
||||
throws IOException {
|
||||
for (StorageDirectory sd : storageDirs) {
|
||||
if (location.matchesStorageDirectory(sd, bpid)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the layout of the given storage directory is from a version
|
||||
* of Hadoop prior to the introduction of the "current" and "previous"
|
||||
|
|
|
@ -22,7 +22,6 @@
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -147,10 +146,11 @@ public void addStorageDir(StorageDirectory sd) {
|
|||
* @throws IOException
|
||||
*/
|
||||
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
|
||||
File dataDir, StorageLocation location, StartupOption startOpt,
|
||||
StorageLocation location, StartupOption startOpt,
|
||||
List<Callable<StorageDirectory>> callables, Configuration conf)
|
||||
throws IOException {
|
||||
StorageDirectory sd = new StorageDirectory(dataDir, null, true, location);
|
||||
StorageDirectory sd = new StorageDirectory(
|
||||
nsInfo.getBlockPoolID(), null, true, location);
|
||||
try {
|
||||
StorageState curState = sd.analyzeStorage(startOpt, this, true);
|
||||
// sd is locked but not opened
|
||||
|
@ -158,11 +158,15 @@ private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
|
|||
case NORMAL:
|
||||
break;
|
||||
case NON_EXISTENT:
|
||||
LOG.info("Block pool storage directory " + dataDir + " does not exist");
|
||||
throw new IOException("Storage directory " + dataDir
|
||||
+ " does not exist");
|
||||
LOG.info("Block pool storage directory for location " + location +
|
||||
" and block pool id " + nsInfo.getBlockPoolID() +
|
||||
" does not exist");
|
||||
throw new IOException("Storage directory for location " + location +
|
||||
" and block pool id " + nsInfo.getBlockPoolID() +
|
||||
" does not exist");
|
||||
case NOT_FORMATTED: // format
|
||||
LOG.info("Block pool storage directory " + dataDir
|
||||
LOG.info("Block pool storage directory for location " + location +
|
||||
" and block pool id " + nsInfo.getBlockPoolID()
|
||||
+ " is not formatted for " + nsInfo.getBlockPoolID()
|
||||
+ ". Formatting ...");
|
||||
format(sd, nsInfo);
|
||||
|
@ -208,21 +212,19 @@ private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
|
|||
* @throws IOException on error
|
||||
*/
|
||||
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
|
||||
Collection<File> dataDirs, StorageLocation location,
|
||||
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
|
||||
Configuration conf) throws IOException {
|
||||
StorageLocation location, StartupOption startOpt,
|
||||
List<Callable<StorageDirectory>> callables, Configuration conf)
|
||||
throws IOException {
|
||||
List<StorageDirectory> succeedDirs = Lists.newArrayList();
|
||||
try {
|
||||
for (File dataDir : dataDirs) {
|
||||
if (containsStorageDir(dataDir)) {
|
||||
throw new IOException(
|
||||
"BlockPoolSliceStorage.recoverTransitionRead: " +
|
||||
"attempt to load an used block storage: " + dataDir);
|
||||
}
|
||||
final StorageDirectory sd = loadStorageDirectory(
|
||||
nsInfo, dataDir, location, startOpt, callables, conf);
|
||||
succeedDirs.add(sd);
|
||||
if (containsStorageDir(location, nsInfo.getBlockPoolID())) {
|
||||
throw new IOException(
|
||||
"BlockPoolSliceStorage.recoverTransitionRead: " +
|
||||
"attempt to load an used block storage: " + location);
|
||||
}
|
||||
final StorageDirectory sd = loadStorageDirectory(
|
||||
nsInfo, location, startOpt, callables, conf);
|
||||
succeedDirs.add(sd);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to analyze storage directories for block pool "
|
||||
+ nsInfo.getBlockPoolID(), e);
|
||||
|
@ -244,12 +246,12 @@ List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
|
|||
* @throws IOException on error
|
||||
*/
|
||||
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
|
||||
Collection<File> dataDirs, StorageLocation location,
|
||||
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
|
||||
Configuration conf) throws IOException {
|
||||
StorageLocation location, StartupOption startOpt,
|
||||
List<Callable<StorageDirectory>> callables, Configuration conf)
|
||||
throws IOException {
|
||||
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
|
||||
final List<StorageDirectory> loaded = loadBpStorageDirectories(
|
||||
nsInfo, dataDirs, location, startOpt, callables, conf);
|
||||
nsInfo, location, startOpt, callables, conf);
|
||||
for (StorageDirectory sd : loaded) {
|
||||
addStorageDir(sd);
|
||||
}
|
||||
|
|
|
@ -648,7 +648,7 @@ ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
|
|||
// Use the existing StorageLocation to detect storage type changes.
|
||||
Map<String, StorageLocation> existingLocations = new HashMap<>();
|
||||
for (StorageLocation loc : getStorageLocations(getConf())) {
|
||||
existingLocations.put(loc.getFile().getCanonicalPath(), loc);
|
||||
existingLocations.put(loc.getNormalizedUri().toString(), loc);
|
||||
}
|
||||
|
||||
ChangedVolumes results = new ChangedVolumes();
|
||||
|
@ -661,11 +661,10 @@ ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
|
|||
for (Iterator<StorageLocation> sl = results.newLocations.iterator();
|
||||
sl.hasNext(); ) {
|
||||
StorageLocation location = sl.next();
|
||||
if (location.getFile().getCanonicalPath().equals(
|
||||
dir.getRoot().getCanonicalPath())) {
|
||||
if (location.matchesStorageDirectory(dir)) {
|
||||
sl.remove();
|
||||
StorageLocation old = existingLocations.get(
|
||||
location.getFile().getCanonicalPath());
|
||||
location.getNormalizedUri().toString());
|
||||
if (old != null &&
|
||||
old.getStorageType() != location.getStorageType()) {
|
||||
throw new IOException("Changing storage type is not allowed.");
|
||||
|
@ -2676,7 +2675,7 @@ static List<StorageLocation> checkStorageLocations(
|
|||
locations.add(location);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
|
||||
+ location.getFile() + " : ", ioe);
|
||||
+ location + " : ", ioe);
|
||||
invalidDirs.append("\"").append(uri.getPath()).append("\" ");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,15 +46,10 @@
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.HardLink;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
|
@ -66,8 +61,6 @@
|
|||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ComparisonChain;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -263,10 +256,9 @@ public void build() {
|
|||
}
|
||||
|
||||
private StorageDirectory loadStorageDirectory(DataNode datanode,
|
||||
NamespaceInfo nsInfo, File dataDir, StorageLocation location,
|
||||
StartupOption startOpt, List<Callable<StorageDirectory>> callables)
|
||||
throws IOException {
|
||||
StorageDirectory sd = new StorageDirectory(dataDir, null, false, location);
|
||||
NamespaceInfo nsInfo, StorageLocation location, StartupOption startOpt,
|
||||
List<Callable<StorageDirectory>> callables) throws IOException {
|
||||
StorageDirectory sd = new StorageDirectory(null, false, location);
|
||||
try {
|
||||
StorageState curState = sd.analyzeStorage(startOpt, this, true);
|
||||
// sd is locked but not opened
|
||||
|
@ -274,11 +266,12 @@ private StorageDirectory loadStorageDirectory(DataNode datanode,
|
|||
case NORMAL:
|
||||
break;
|
||||
case NON_EXISTENT:
|
||||
LOG.info("Storage directory " + dataDir + " does not exist");
|
||||
throw new IOException("Storage directory " + dataDir
|
||||
LOG.info("Storage directory with location " + location
|
||||
+ " does not exist");
|
||||
throw new IOException("Storage directory with location " + location
|
||||
+ " does not exist");
|
||||
case NOT_FORMATTED: // format
|
||||
LOG.info("Storage directory " + dataDir
|
||||
LOG.info("Storage directory with location " + location
|
||||
+ " is not formatted for namespace " + nsInfo.getNamespaceID()
|
||||
+ ". Formatting...");
|
||||
format(sd, nsInfo, datanode.getDatanodeUuid());
|
||||
|
@ -322,28 +315,22 @@ private StorageDirectory loadStorageDirectory(DataNode datanode,
|
|||
public VolumeBuilder prepareVolume(DataNode datanode,
|
||||
StorageLocation location, List<NamespaceInfo> nsInfos)
|
||||
throws IOException {
|
||||
File volume = location.getFile();
|
||||
if (containsStorageDir(volume)) {
|
||||
if (containsStorageDir(location)) {
|
||||
final String errorMessage = "Storage directory is in use";
|
||||
LOG.warn(errorMessage + ".");
|
||||
throw new IOException(errorMessage);
|
||||
}
|
||||
|
||||
StorageDirectory sd = loadStorageDirectory(
|
||||
datanode, nsInfos.get(0), volume, location,
|
||||
StartupOption.HOTSWAP, null);
|
||||
datanode, nsInfos.get(0), location, StartupOption.HOTSWAP, null);
|
||||
VolumeBuilder builder =
|
||||
new VolumeBuilder(this, sd);
|
||||
for (NamespaceInfo nsInfo : nsInfos) {
|
||||
List<File> bpDataDirs = Lists.newArrayList();
|
||||
bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(
|
||||
nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT)));
|
||||
makeBlockPoolDataDir(bpDataDirs, null);
|
||||
location.makeBlockPoolDir(nsInfo.getBlockPoolID(), null);
|
||||
|
||||
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
|
||||
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
|
||||
nsInfo, bpDataDirs, location, StartupOption.HOTSWAP,
|
||||
null, datanode.getConf());
|
||||
nsInfo, location, StartupOption.HOTSWAP, null, datanode.getConf());
|
||||
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
|
||||
}
|
||||
return builder;
|
||||
|
@ -405,14 +392,13 @@ private List<StorageLocation> loadDataStorage(DataNode datanode,
|
|||
final List<StorageLocation> success = Lists.newArrayList();
|
||||
final List<UpgradeTask> tasks = Lists.newArrayList();
|
||||
for (StorageLocation dataDir : dataDirs) {
|
||||
File root = dataDir.getFile();
|
||||
if (!containsStorageDir(root)) {
|
||||
if (!containsStorageDir(dataDir)) {
|
||||
try {
|
||||
// It first ensures the datanode level format is completed.
|
||||
final List<Callable<StorageDirectory>> callables
|
||||
= Lists.newArrayList();
|
||||
final StorageDirectory sd = loadStorageDirectory(
|
||||
datanode, nsInfo, root, dataDir, startOpt, callables);
|
||||
datanode, nsInfo, dataDir, startOpt, callables);
|
||||
if (callables.isEmpty()) {
|
||||
addStorageDir(sd);
|
||||
success.add(dataDir);
|
||||
|
@ -455,16 +441,11 @@ private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
|
|||
final List<StorageDirectory> success = Lists.newArrayList();
|
||||
final List<UpgradeTask> tasks = Lists.newArrayList();
|
||||
for (StorageLocation dataDir : dataDirs) {
|
||||
final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
|
||||
List<File> bpDataDirs = new ArrayList<File>();
|
||||
bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, curDir));
|
||||
dataDir.makeBlockPoolDir(bpid, null);
|
||||
try {
|
||||
makeBlockPoolDataDir(bpDataDirs, null);
|
||||
|
||||
final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
|
||||
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
|
||||
nsInfo, bpDataDirs, dataDir, startOpt,
|
||||
callables, datanode.getConf());
|
||||
nsInfo, dataDir, startOpt, callables, datanode.getConf());
|
||||
if (callables.isEmpty()) {
|
||||
for(StorageDirectory sd : dirs) {
|
||||
success.add(sd);
|
||||
|
@ -566,34 +547,6 @@ void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create physical directory for block pools on the data node
|
||||
*
|
||||
* @param dataDirs
|
||||
* List of data directories
|
||||
* @param conf
|
||||
* Configuration instance to use.
|
||||
* @throws IOException on errors
|
||||
*/
|
||||
static void makeBlockPoolDataDir(Collection<File> dataDirs,
|
||||
Configuration conf) throws IOException {
|
||||
if (conf == null)
|
||||
conf = new HdfsConfiguration();
|
||||
|
||||
LocalFileSystem localFS = FileSystem.getLocal(conf);
|
||||
FsPermission permission = new FsPermission(conf.get(
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
|
||||
for (File data : dataDirs) {
|
||||
try {
|
||||
DiskChecker.checkDir(localFS, new Path(data.toURI()), permission);
|
||||
} catch ( IOException e ) {
|
||||
LOG.warn("Invalid directory in: " + data.getCanonicalPath() + ": "
|
||||
+ e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void format(StorageDirectory sd, NamespaceInfo nsInfo,
|
||||
String datanodeUuid) throws IOException {
|
||||
sd.clearDirectory(); // create directory
|
||||
|
|
|
@ -351,7 +351,13 @@ private boolean renameFile(File srcfile, File destfile) throws IOException {
|
|||
@Override
|
||||
public void updateWithReplica(StorageLocation replicaLocation) {
|
||||
// for local replicas, the replica location is assumed to be a file.
|
||||
File diskFile = replicaLocation.getFile();
|
||||
File diskFile = null;
|
||||
try {
|
||||
diskFile = new File(replicaLocation.getUri());
|
||||
} catch (IllegalArgumentException e) {
|
||||
diskFile = null;
|
||||
}
|
||||
|
||||
if (null == diskFile) {
|
||||
setDirInternal(null);
|
||||
} else {
|
||||
|
|
|
@ -23,11 +23,21 @@
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.regex.Matcher;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
||||
|
@ -40,8 +50,7 @@
|
|||
@InterfaceAudience.Private
|
||||
public class StorageLocation implements Comparable<StorageLocation>{
|
||||
final StorageType storageType;
|
||||
final File file;
|
||||
|
||||
private final URI baseURI;
|
||||
/** Regular expression that describes a storage uri with a storage type.
|
||||
* e.g. [Disk]/storages/storage1/
|
||||
*/
|
||||
|
@ -49,26 +58,41 @@ public class StorageLocation implements Comparable<StorageLocation>{
|
|||
|
||||
private StorageLocation(StorageType storageType, URI uri) {
|
||||
this.storageType = storageType;
|
||||
|
||||
if (uri.getScheme() == null ||
|
||||
"file".equalsIgnoreCase(uri.getScheme())) {
|
||||
// drop any (illegal) authority in the URI for backwards compatibility
|
||||
this.file = new File(uri.getPath());
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported URI ecPolicy in " + uri);
|
||||
if (uri.getScheme() == null || uri.getScheme().equals("file")) {
|
||||
// make sure all URIs that point to a file have the same scheme
|
||||
try {
|
||||
File uriFile = new File(uri.getPath());
|
||||
String absPath = uriFile.getAbsolutePath();
|
||||
uri = new URI("file", null, absPath, uri.getQuery(), uri.getFragment());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"URI: " + uri + " is not in the expected format");
|
||||
}
|
||||
}
|
||||
baseURI = uri;
|
||||
}
|
||||
|
||||
public StorageType getStorageType() {
|
||||
return this.storageType;
|
||||
}
|
||||
|
||||
URI getUri() {
|
||||
return file.toURI();
|
||||
public URI getUri() {
|
||||
return baseURI;
|
||||
}
|
||||
|
||||
public File getFile() {
|
||||
return this.file;
|
||||
public URI getNormalizedUri() {
|
||||
return baseURI.normalize();
|
||||
}
|
||||
|
||||
public boolean matchesStorageDirectory(StorageDirectory sd)
|
||||
throws IOException {
|
||||
return this.equals(sd.getStorageLocation());
|
||||
}
|
||||
|
||||
public boolean matchesStorageDirectory(StorageDirectory sd,
|
||||
String bpid) throws IOException {
|
||||
return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize()
|
||||
.equals(sd.getRoot().toURI().normalize());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -94,13 +118,14 @@ public static StorageLocation parse(String rawLocation)
|
|||
StorageType.valueOf(StringUtils.toUpperCase(classString));
|
||||
}
|
||||
}
|
||||
|
||||
//do Path.toURI instead of new URI(location) as this ensures that
|
||||
//"/a/b" and "/a/b/" are represented in a consistent manner
|
||||
return new StorageLocation(storageType, new Path(location).toUri());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + storageType + "]" + file.toURI();
|
||||
return "[" + storageType + "]" + baseURI.normalize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,16 +151,56 @@ public int compareTo(StorageLocation obj) {
|
|||
}
|
||||
|
||||
StorageLocation otherStorage = (StorageLocation) obj;
|
||||
if (this.getFile() != null && otherStorage.getFile() != null) {
|
||||
return this.getFile().getAbsolutePath().compareTo(
|
||||
otherStorage.getFile().getAbsolutePath());
|
||||
} else if (this.getFile() == null && otherStorage.getFile() == null) {
|
||||
if (this.getNormalizedUri() != null &&
|
||||
otherStorage.getNormalizedUri() != null) {
|
||||
return this.getNormalizedUri().compareTo(
|
||||
otherStorage.getNormalizedUri());
|
||||
} else if (this.getNormalizedUri() == null &&
|
||||
otherStorage.getNormalizedUri() == null) {
|
||||
return this.storageType.compareTo(otherStorage.getStorageType());
|
||||
} else if (this.getFile() == null) {
|
||||
} else if (this.getNormalizedUri() == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public URI getBpURI(String bpid, String currentStorageDir) {
|
||||
try {
|
||||
File localFile = new File(getUri());
|
||||
return new File(new File(localFile, currentStorageDir), bpid).toURI();
|
||||
} catch (IllegalArgumentException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create physical directory for block pools on the data node.
|
||||
*
|
||||
* @param blockPoolID
|
||||
* the block pool id
|
||||
* @param conf
|
||||
* Configuration instance to use.
|
||||
* @throws IOException on errors
|
||||
*/
|
||||
public void makeBlockPoolDir(String blockPoolID,
|
||||
Configuration conf) throws IOException {
|
||||
|
||||
if (conf == null) {
|
||||
conf = new HdfsConfiguration();
|
||||
}
|
||||
|
||||
LocalFileSystem localFS = FileSystem.getLocal(conf);
|
||||
FsPermission permission = new FsPermission(conf.get(
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
|
||||
File data = new File(getBpURI(blockPoolID, Storage.STORAGE_DIR_CURRENT));
|
||||
try {
|
||||
DiskChecker.checkDir(localFS, new Path(data.toURI()), permission);
|
||||
} catch (IOException e) {
|
||||
DataStorage.LOG.warn("Invalid directory in: " + data.getCanonicalPath() +
|
||||
": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -562,16 +562,6 @@ public void removeVolumes(
|
|||
}
|
||||
}
|
||||
|
||||
private StorageType getStorageTypeFromLocations(
|
||||
Collection<StorageLocation> dataLocations, File dir) {
|
||||
for (StorageLocation dataLocation : dataLocations) {
|
||||
if (dataLocation.getFile().equals(dir)) {
|
||||
return dataLocation.getStorageType();
|
||||
}
|
||||
}
|
||||
return StorageType.DEFAULT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the total space used by dfs datanode
|
||||
*/
|
||||
|
@ -635,7 +625,7 @@ public String[] getFailedStorageLocations() {
|
|||
infos.length);
|
||||
for (VolumeFailureInfo info: infos) {
|
||||
failedStorageLocations.add(
|
||||
info.getFailedStorageLocation().getFile().getAbsolutePath());
|
||||
info.getFailedStorageLocation().getNormalizedUri().toString());
|
||||
}
|
||||
return failedStorageLocations.toArray(
|
||||
new String[failedStorageLocations.size()]);
|
||||
|
@ -674,7 +664,7 @@ public VolumeFailureSummary getVolumeFailureSummary() {
|
|||
long estimatedCapacityLostTotal = 0;
|
||||
for (VolumeFailureInfo info: infos) {
|
||||
failedStorageLocations.add(
|
||||
info.getFailedStorageLocation().getFile().getAbsolutePath());
|
||||
info.getFailedStorageLocation().getNormalizedUri().toString());
|
||||
long failureDate = info.getFailureDate();
|
||||
if (failureDate > lastVolumeFailureDate) {
|
||||
lastVolumeFailureDate = failureDate;
|
||||
|
|
|
@ -231,9 +231,9 @@ public void testRemovingStorageDoesNotProduceZombies() throws Exception {
|
|||
// it would be re-initialized with a new storage ID.)
|
||||
assertNotNull(volumeLocationToRemove);
|
||||
datanodeToRemoveStorageFrom.shutdown();
|
||||
FileUtil.fullyDelete(volumeLocationToRemove.getFile());
|
||||
FileUtil.fullyDelete(new File(volumeLocationToRemove.getUri()));
|
||||
FileOutputStream fos = new FileOutputStream(
|
||||
volumeLocationToRemove.getFile().toString());
|
||||
new File(volumeLocationToRemove.getUri()));
|
||||
try {
|
||||
fos.write(1);
|
||||
} finally {
|
||||
|
@ -327,7 +327,8 @@ public void testRenamingStorageIds() throws Exception {
|
|||
final String newStorageId = DatanodeStorage.generateUuid();
|
||||
try {
|
||||
File currentDir = new File(
|
||||
volumeRefs.get(0).getStorageLocation().getFile(), "current");
|
||||
new File(volumeRefs.get(0).getStorageLocation().getUri()),
|
||||
"current");
|
||||
File versionFile = new File(currentDir, "VERSION");
|
||||
rewriteVersionFile(versionFile, newStorageId);
|
||||
} finally {
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -549,9 +550,8 @@ public Boolean get() {
|
|||
info.shouldRun = false;
|
||||
}
|
||||
ctx.datanode.shutdown();
|
||||
String vPath = ctx.volumes.get(0).getStorageLocation()
|
||||
.getFile().getAbsolutePath();
|
||||
File cursorPath = new File(new File(new File(vPath, "current"),
|
||||
URI vURI = ctx.volumes.get(0).getStorageLocation().getUri();
|
||||
File cursorPath = new File(new File(new File(new File(vURI), "current"),
|
||||
ctx.bpids[0]), "scanner.cursor");
|
||||
assertTrue("Failed to find cursor save file in " +
|
||||
cursorPath.getAbsolutePath(), cursorPath.exists());
|
||||
|
|
|
@ -114,7 +114,8 @@ public void testDataDirValidation() throws Throwable {
|
|||
List<StorageLocation> checkedLocations =
|
||||
DataNode.checkStorageLocations(locations, fs, diskChecker);
|
||||
assertEquals("number of valid data dirs", 1, checkedLocations.size());
|
||||
String validDir = checkedLocations.iterator().next().getFile().getPath();
|
||||
String validDir =
|
||||
new File(checkedLocations.iterator().next().getUri()).getPath();
|
||||
assertThat("p3 should be valid", new File("/p3/").getPath(), is(validDir));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -221,7 +221,7 @@ public void testParseChangedVolumes() throws IOException {
|
|||
}
|
||||
assertFalse(oldLocations.isEmpty());
|
||||
|
||||
String newPaths = oldLocations.get(0).getFile().getAbsolutePath() +
|
||||
String newPaths = new File(oldLocations.get(0).getUri()).getAbsolutePath() +
|
||||
",/foo/path1,/foo/path2";
|
||||
|
||||
DataNode.ChangedVolumes changedVolumes =
|
||||
|
@ -229,18 +229,18 @@ public void testParseChangedVolumes() throws IOException {
|
|||
List<StorageLocation> newVolumes = changedVolumes.newLocations;
|
||||
assertEquals(2, newVolumes.size());
|
||||
assertEquals(new File("/foo/path1").getAbsolutePath(),
|
||||
newVolumes.get(0).getFile().getAbsolutePath());
|
||||
new File(newVolumes.get(0).getUri()).getAbsolutePath());
|
||||
assertEquals(new File("/foo/path2").getAbsolutePath(),
|
||||
newVolumes.get(1).getFile().getAbsolutePath());
|
||||
new File(newVolumes.get(1).getUri()).getAbsolutePath());
|
||||
|
||||
List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations;
|
||||
assertEquals(1, removedVolumes.size());
|
||||
assertEquals(oldLocations.get(1).getFile(),
|
||||
removedVolumes.get(0).getFile());
|
||||
assertEquals(oldLocations.get(1).getNormalizedUri(),
|
||||
removedVolumes.get(0).getNormalizedUri());
|
||||
|
||||
assertEquals(1, changedVolumes.unchangedLocations.size());
|
||||
assertEquals(oldLocations.get(0).getFile(),
|
||||
changedVolumes.unchangedLocations.get(0).getFile());
|
||||
assertEquals(oldLocations.get(0).getNormalizedUri(),
|
||||
changedVolumes.unchangedLocations.get(0).getNormalizedUri());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -519,7 +519,7 @@ public void testReplicatingAfterRemoveVolume()
|
|||
DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock();
|
||||
FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block);
|
||||
String dirWithBlock = "[" + volumeWithBlock.getStorageType() + "]" +
|
||||
volumeWithBlock.getStorageLocation().getFile().toURI();
|
||||
volumeWithBlock.getStorageLocation().getUri();
|
||||
String newDirs = dirWithBlock;
|
||||
for (String dir : oldDirs) {
|
||||
if (dirWithBlock.startsWith(dir)) {
|
||||
|
@ -577,7 +577,7 @@ public void testAddVolumeFailures() throws IOException {
|
|||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dataset.getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi volume : volumes) {
|
||||
assertThat(volume.getStorageLocation().getFile().toString(),
|
||||
assertThat(new File(volume.getStorageLocation().getUri()).toString(),
|
||||
is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
|
||||
}
|
||||
}
|
||||
|
@ -593,8 +593,10 @@ public void testAddVolumeFailures() throws IOException {
|
|||
dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||
assertEquals(4, effectiveVolumes.length);
|
||||
for (String ev : effectiveVolumes) {
|
||||
assertThat(StorageLocation.parse(ev).getFile().getCanonicalPath(),
|
||||
is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
|
||||
assertThat(
|
||||
new File(StorageLocation.parse(ev).getUri()).getCanonicalPath(),
|
||||
is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2)))))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -253,7 +253,7 @@ public void testFailedVolumeBeingRemovedFromDataNode()
|
|||
FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
|
||||
try (FsDatasetSpi.FsVolumeReferences vols = data.getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi volume : vols) {
|
||||
assertFalse(volume.getStorageLocation().getFile()
|
||||
assertFalse(new File(volume.getStorageLocation().getUri())
|
||||
.getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath()
|
||||
));
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ public void testFailedVolumeBeingRemovedFromDataNode()
|
|||
// 3. all blocks on dn0Vol1 have been removed.
|
||||
for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) {
|
||||
assertNotNull(replica.getVolume());
|
||||
assertFalse(replica.getVolume().getStorageLocation().getFile()
|
||||
assertFalse(new File(replica.getVolume().getStorageLocation().getUri())
|
||||
.getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath()
|
||||
));
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -467,7 +469,8 @@ public void testAutoFormatEmptyDirectory() throws Exception {
|
|||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
FsDatasetSpi<?> fsd = dn.getFSDataset();
|
||||
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
|
||||
assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());
|
||||
assertArrayEquals(expectedFailedVolumes,
|
||||
convertToAbsolutePaths(fsd.getFailedStorageLocations()));
|
||||
// there shouldn't be any more volume failures due to I/O failure
|
||||
checkFailuresAtDataNode(dn, 0, false, expectedFailedVolumes);
|
||||
|
||||
|
@ -550,7 +553,8 @@ private void checkFailuresAtDataNode(DataNode dn,
|
|||
}
|
||||
LOG.info(strBuilder.toString());
|
||||
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
|
||||
assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());
|
||||
assertArrayEquals(expectedFailedVolumes,
|
||||
convertToAbsolutePaths(fsd.getFailedStorageLocations()));
|
||||
if (expectedFailedVolumes.length > 0) {
|
||||
assertTrue(fsd.getLastVolumeFailureDate() > 0);
|
||||
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
|
||||
|
@ -582,8 +586,9 @@ private void checkFailuresAtNameNode(DatanodeManager dm, DataNode dn,
|
|||
assertEquals(expectedFailedVolumes.length, dd.getVolumeFailures());
|
||||
VolumeFailureSummary volumeFailureSummary = dd.getVolumeFailureSummary();
|
||||
if (expectedFailedVolumes.length > 0) {
|
||||
assertArrayEquals(expectedFailedVolumes, volumeFailureSummary
|
||||
.getFailedStorageLocations());
|
||||
assertArrayEquals(expectedFailedVolumes,
|
||||
convertToAbsolutePaths(volumeFailureSummary
|
||||
.getFailedStorageLocations()));
|
||||
assertTrue(volumeFailureSummary.getLastVolumeFailureDate() > 0);
|
||||
long expectedCapacityLost = getExpectedCapacityLost(expectCapacityKnown,
|
||||
expectedFailedVolumes.length);
|
||||
|
@ -594,6 +599,30 @@ private void checkFailuresAtNameNode(DatanodeManager dm, DataNode dn,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the provided paths to absolute file paths.
|
||||
* @param locations the array of paths
|
||||
* @return array of absolute paths
|
||||
*/
|
||||
private String[] convertToAbsolutePaths(String[] locations) {
|
||||
if (locations == null || locations.length == 0) {
|
||||
return new String[0];
|
||||
}
|
||||
|
||||
String[] absolutePaths = new String[locations.length];
|
||||
for (int count = 0; count < locations.length; count++) {
|
||||
try {
|
||||
absolutePaths[count] = new File(new URI(locations[count]))
|
||||
.getAbsolutePath();
|
||||
} catch (URISyntaxException e) {
|
||||
//if the provided location is not an URI,
|
||||
//we use it as the absolute path
|
||||
absolutePaths[count] = locations[count];
|
||||
}
|
||||
}
|
||||
return absolutePaths;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns expected capacity lost for use in assertions. The return value is
|
||||
* dependent on whether or not it is expected that the volume capacities were
|
||||
|
|
|
@ -142,8 +142,8 @@ public void testAddStorageDirectories() throws IOException,
|
|||
for (NamespaceInfo ni : namespaceInfos) {
|
||||
storage.addStorageLocations(mockDN, ni, locations, START_OPT);
|
||||
for (StorageLocation sl : locations) {
|
||||
checkDir(sl.getFile());
|
||||
checkDir(sl.getFile(), ni.getBlockPoolID());
|
||||
checkDir(new File(sl.getUri()));
|
||||
checkDir(new File(sl.getUri()), ni.getBlockPoolID());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -173,8 +173,7 @@ public void testMissingVersion() throws IOException,
|
|||
List<StorageLocation> locations = createStorageLocations(numLocations);
|
||||
|
||||
StorageLocation firstStorage = locations.get(0);
|
||||
Storage.StorageDirectory sd = new Storage.StorageDirectory(
|
||||
firstStorage.getFile());
|
||||
Storage.StorageDirectory sd = new Storage.StorageDirectory(firstStorage);
|
||||
// the directory is not initialized so VERSION does not exist
|
||||
// create a fake directory under current/
|
||||
File currentDir = new File(sd.getCurrentDir(),
|
||||
|
|
|
@ -189,8 +189,8 @@ private void duplicateBlock(long blockId) throws IOException {
|
|||
// Volume without a copy of the block. Make a copy now.
|
||||
File sourceBlock = new File(b.getBlockURI());
|
||||
File sourceMeta = new File(b.getMetadataURI());
|
||||
URI sourceRoot = b.getVolume().getStorageLocation().getFile().toURI();
|
||||
URI destRoot = v.getStorageLocation().getFile().toURI();
|
||||
URI sourceRoot = b.getVolume().getStorageLocation().getUri();
|
||||
URI destRoot = v.getStorageLocation().getUri();
|
||||
|
||||
String relativeBlockPath =
|
||||
sourceRoot.relativize(sourceBlock.toURI())
|
||||
|
|
|
@ -199,8 +199,7 @@ public void testLocalDirs() throws Exception {
|
|||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dn.getFSDataset().getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi vol : volumes) {
|
||||
String dir = vol.getStorageLocation().getFile().getAbsolutePath();
|
||||
Path dataDir = new Path(dir);
|
||||
Path dataDir = new Path(vol.getStorageLocation().getNormalizedUri());
|
||||
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
|
||||
assertEquals("Permission for dir: " + dataDir + ", is " + actual +
|
||||
", while expected is " + expected, expected, actual);
|
||||
|
|
|
@ -100,7 +100,7 @@ public static void stopLazyWriter(DataNode dn) {
|
|||
*/
|
||||
public static void assertFileLockReleased(String dir) throws IOException {
|
||||
StorageLocation sl = StorageLocation.parse(dir);
|
||||
File lockFile = new File(sl.getFile(), Storage.STORAGE_FILE_LOCK);
|
||||
File lockFile = new File(new File(sl.getUri()), Storage.STORAGE_FILE_LOCK);
|
||||
try (RandomAccessFile raf = new RandomAccessFile(lockFile, "rws");
|
||||
FileChannel channel = raf.getChannel()) {
|
||||
FileLock lock = channel.tryLock();
|
||||
|
|
|
@ -290,7 +290,7 @@ private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
|
|||
datanode.getConf());
|
||||
if (expectedSuccuss) {
|
||||
assertThat(locations.size(), is(1));
|
||||
assertThat(locations.get(0).getFile(), is(newDir));
|
||||
assertThat(new File(locations.get(0).getUri()), is(newDir));
|
||||
// Verify the directory is appropriately formatted.
|
||||
assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue