HDFS-9654. Code refactoring for HDFS-8578.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-01-28 10:56:01 +08:00
parent dca0dc8ac2
commit 662e17b46a
11 changed files with 297 additions and 200 deletions

View File

@ -2665,6 +2665,8 @@ Release 2.7.3 - UNRELEASED
HDFS-9634. webhdfs client side exceptions don't provide enough details HDFS-9634. webhdfs client side exceptions don't provide enough details
(Eric Payne via kihwal) (Eric Payne via kihwal)
HDFS-9654. Code refactoring for HDFS-8578. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -640,7 +640,8 @@ public abstract class Storage extends StorageInfo {
rename(getLastCheckpointTmp(), curDir); rename(getLastCheckpointTmp(), curDir);
return; return;
default: default:
throw new IOException("Unexpected FS state: " + curState); throw new IOException("Unexpected FS state: " + curState
+ " for storage directory: " + rootPath);
} }
} }

View File

@ -18,10 +18,21 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting; import java.io.File;
import com.google.common.base.Preconditions; import java.io.IOException;
import com.google.common.collect.Lists; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@ -34,18 +45,9 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import java.io.File; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import com.google.common.base.Preconditions;
import java.util.ArrayList; import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* Manages storage for the set of BlockPoolSlices which share a particular * Manages storage for the set of BlockPoolSlices which share a particular
@ -136,15 +138,15 @@ public class BlockPoolSliceStorage extends Storage {
/** /**
* Load one storage directory. Recover from previous transitions if required. * Load one storage directory. Recover from previous transitions if required.
* *
* @param datanode datanode instance
* @param nsInfo namespace information * @param nsInfo namespace information
* @param dataDir the root path of the storage directory * @param dataDir the root path of the storage directory
* @param startOpt startup option * @param startOpt startup option
* @return the StorageDirectory successfully loaded. * @return the StorageDirectory successfully loaded.
* @throws IOException * @throws IOException
*/ */
private StorageDirectory loadStorageDirectory(DataNode datanode, private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException { File dataDir, StartupOption startOpt, Configuration conf)
throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, true); StorageDirectory sd = new StorageDirectory(dataDir, null, true);
try { try {
StorageState curState = sd.analyzeStorage(startOpt, this); StorageState curState = sd.analyzeStorage(startOpt, this);
@ -158,8 +160,8 @@ public class BlockPoolSliceStorage extends Storage {
+ " does not exist"); + " does not exist");
case NOT_FORMATTED: // format case NOT_FORMATTED: // format
LOG.info("Block pool storage directory " + dataDir LOG.info("Block pool storage directory " + dataDir
+ " is not formatted for " + nsInfo.getBlockPoolID()); + " is not formatted for " + nsInfo.getBlockPoolID()
LOG.info("Formatting ..."); + ". Formatting ...");
format(sd, nsInfo); format(sd, nsInfo);
break; break;
default: // recovery part is common default: // recovery part is common
@ -170,10 +172,13 @@ public class BlockPoolSliceStorage extends Storage {
// Each storage directory is treated individually. // Each storage directory is treated individually.
// During startup some of them can upgrade or roll back // During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup. // while others could be up-to-date for the regular startup.
doTransition(datanode, sd, nsInfo, startOpt); if (doTransition(sd, nsInfo, startOpt, conf)) {
return sd;
}
if (getCTime() != nsInfo.getCTime()) { if (getCTime() != nsInfo.getCTime()) {
throw new IOException( throw new IOException("Datanode CTime (=" + getCTime()
"Data-node and name-node CTimes must be the same."); + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
} }
// 3. Update successfully loaded storage. // 3. Update successfully loaded storage.
@ -195,16 +200,15 @@ public class BlockPoolSliceStorage extends Storage {
* Therefore, a failure on loading any block pool storage results a faulty * Therefore, a failure on loading any block pool storage results a faulty
* data volume. * data volume.
* *
* @param datanode Datanode to which this storage belongs to
* @param nsInfo namespace information * @param nsInfo namespace information
* @param dataDirs storage directories of block pool * @param dataDirs storage directories of block pool
* @param startOpt startup option * @param startOpt startup option
* @return an array of loaded block pool directories. * @return an array of loaded block pool directories.
* @throws IOException on error * @throws IOException on error
*/ */
List<StorageDirectory> loadBpStorageDirectories( List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
DataNode datanode, NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt,
Collection<File> dataDirs, StartupOption startOpt) throws IOException { Configuration conf) throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList(); List<StorageDirectory> succeedDirs = Lists.newArrayList();
try { try {
for (File dataDir : dataDirs) { for (File dataDir : dataDirs) {
@ -213,8 +217,8 @@ public class BlockPoolSliceStorage extends Storage {
"BlockPoolSliceStorage.recoverTransitionRead: " + "BlockPoolSliceStorage.recoverTransitionRead: " +
"attempt to load an used block storage: " + dataDir); "attempt to load an used block storage: " + dataDir);
} }
StorageDirectory sd = final StorageDirectory sd = loadStorageDirectory(
loadStorageDirectory(datanode, nsInfo, dataDir, startOpt); nsInfo, dataDir, startOpt, conf);
succeedDirs.add(sd); succeedDirs.add(sd);
} }
} catch (IOException e) { } catch (IOException e) {
@ -232,19 +236,21 @@ public class BlockPoolSliceStorage extends Storage {
* Therefore, a failure on loading any block pool storage results a faulty * Therefore, a failure on loading any block pool storage results a faulty
* data volume. * data volume.
* *
* @param datanode Datanode to which this storage belongs to
* @param nsInfo namespace information * @param nsInfo namespace information
* @param dataDirs storage directories of block pool * @param dataDirs storage directories of block pool
* @param startOpt startup option * @param startOpt startup option
* @throws IOException on error * @throws IOException on error
*/ */
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException { Collection<File> dataDirs, StartupOption startOpt, Configuration conf)
throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
for (StorageDirectory sd : loadBpStorageDirectories( final List<StorageDirectory> loaded = loadBpStorageDirectories(
datanode, nsInfo, dataDirs, startOpt)) { nsInfo, dataDirs, startOpt, conf);
for (StorageDirectory sd : loaded) {
addStorageDir(sd); addStorageDir(sd);
} }
return loaded;
} }
/** /**
@ -344,10 +350,10 @@ public class BlockPoolSliceStorage extends Storage {
* @param sd storage directory <SD>/current/<bpid> * @param sd storage directory <SD>/current/<bpid>
* @param nsInfo namespace info * @param nsInfo namespace info
* @param startOpt startup option * @param startOpt startup option
* @throws IOException * @return true if the new properties has been written.
*/ */
private void doTransition(DataNode datanode, StorageDirectory sd, private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { StartupOption startOpt, Configuration conf) throws IOException {
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
Preconditions.checkState(!getTrashRootDir(sd).exists(), Preconditions.checkState(!getTrashRootDir(sd).exists(),
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@ -379,7 +385,7 @@ public class BlockPoolSliceStorage extends Storage {
} }
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION
&& this.cTime == nsInfo.getCTime()) { && this.cTime == nsInfo.getCTime()) {
return; // regular startup return false; // regular startup
} }
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) { if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd)); int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
@ -389,8 +395,8 @@ public class BlockPoolSliceStorage extends Storage {
} }
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) { || this.cTime < nsInfo.getCTime()) {
doUpgrade(datanode, sd, nsInfo); // upgrade doUpgrade(sd, nsInfo, conf); // upgrade
return; return true;
} }
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
// must shutdown // must shutdown
@ -418,16 +424,18 @@ public class BlockPoolSliceStorage extends Storage {
* @param nsInfo Namespace Info from the namenode * @param nsInfo Namespace Info from the namenode
* @throws IOException on error * @throws IOException on error
*/ */
void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo) private void doUpgrade(final StorageDirectory bpSd,
throws IOException { final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
// Upgrading is applicable only to release with federation or after // Upgrading is applicable only to release with federation or after
if (!DataNodeLayoutVersion.supports( if (!DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) { LayoutVersion.Feature.FEDERATION, layoutVersion)) {
return; return;
} }
final int oldLV = getLayoutVersion();
LOG.info("Upgrading block pool storage directory " + bpSd.getRoot() LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
+ ".\n old LV = " + this.getLayoutVersion() + "; old CTime = " + ".\n old LV = " + oldLV
+ this.getCTime() + ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION + "; old CTime = " + this.getCTime()
+ ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
+ "; new CTime = " + nsInfo.getCTime()); + "; new CTime = " + nsInfo.getCTime());
// get <SD>/previous directory // get <SD>/previous directory
String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath()); String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
@ -438,8 +446,8 @@ public class BlockPoolSliceStorage extends Storage {
if (dnPrevDir.exists()) { if (dnPrevDir.exists()) {
deleteDir(dnPrevDir); deleteDir(dnPrevDir);
} }
File bpCurDir = bpSd.getCurrentDir(); final File bpCurDir = bpSd.getCurrentDir();
File bpPrevDir = bpSd.getPreviousDir(); final File bpPrevDir = bpSd.getPreviousDir();
assert bpCurDir.exists() : "BP level current directory must exist."; assert bpCurDir.exists() : "BP level current directory must exist.";
cleanupDetachDir(new File(bpCurDir, DataStorage.STORAGE_DIR_DETACHED)); cleanupDetachDir(new File(bpCurDir, DataStorage.STORAGE_DIR_DETACHED));
@ -447,15 +455,23 @@ public class BlockPoolSliceStorage extends Storage {
if (bpPrevDir.exists()) { if (bpPrevDir.exists()) {
deleteDir(bpPrevDir); deleteDir(bpPrevDir);
} }
File bpTmpDir = bpSd.getPreviousTmp(); final File bpTmpDir = bpSd.getPreviousTmp();
assert !bpTmpDir.exists() : "previous.tmp directory must not exist."; assert !bpTmpDir.exists() : "previous.tmp directory must not exist.";
// 2. Rename <SD>/current/<bpid>/current to // 2. Rename <SD>/current/<bpid>/current to
// <SD>/current/<bpid>/previous.tmp // <SD>/current/<bpid>/previous.tmp
rename(bpCurDir, bpTmpDir); rename(bpCurDir, bpTmpDir);
final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
}
private void doUgrade(String name, final StorageDirectory bpSd,
NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
final File bpCurDir, final int oldLV, Configuration conf)
throws IOException {
// 3. Create new <SD>/current with block files hardlinks and VERSION // 3. Create new <SD>/current with block files hardlinks and VERSION
linkAllBlocks(datanode, bpTmpDir, bpCurDir); linkAllBlocks(bpTmpDir, bpCurDir, oldLV, conf);
this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION; this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
assert this.namespaceID == nsInfo.getNamespaceID() assert this.namespaceID == nsInfo.getNamespaceID()
: "Data-node and name-node layout versions must be the same."; : "Data-node and name-node layout versions must be the same.";
@ -465,8 +481,7 @@ public class BlockPoolSliceStorage extends Storage {
// 4.rename <SD>/current/<bpid>/previous.tmp to // 4.rename <SD>/current/<bpid>/previous.tmp to
// <SD>/current/<bpid>/previous // <SD>/current/<bpid>/previous
rename(bpTmpDir, bpPrevDir); rename(bpTmpDir, bpPrevDir);
LOG.info("Upgrade of block pool " + blockpoolID + " at " + bpSd.getRoot() LOG.info("Upgrade of " + name + " is complete");
+ " is complete");
} }
/** /**
@ -640,17 +655,17 @@ public class BlockPoolSliceStorage extends Storage {
* @param toDir the current data directory * @param toDir the current data directory
* @throws IOException if error occurs during hardlink * @throws IOException if error occurs during hardlink
*/ */
private void linkAllBlocks(DataNode datanode, File fromDir, File toDir) private static void linkAllBlocks(File fromDir, File toDir,
throws IOException { int diskLayoutVersion, Configuration conf) throws IOException {
// do the link // do the link
int diskLayoutVersion = this.getLayoutVersion();
// hardlink finalized blocks in tmpDir // hardlink finalized blocks in tmpDir
HardLink hardLink = new HardLink(); HardLink hardLink = new HardLink();
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_FINALIZED,
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); diskLayoutVersion, hardLink, conf);
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW), DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_RBW,
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink); diskLayoutVersion, hardLink, conf);
LOG.info( hardLink.linkStats.report() ); LOG.info("Linked blocks from " + fromDir + " to " + toDir + ". "
+ hardLink.linkStats.report());
} }
/** /**

View File

@ -18,11 +18,29 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting; import java.io.File;
import com.google.common.collect.ComparisonChain; import java.io.IOException;
import com.google.common.collect.Lists; import java.io.RandomAccessFile;
import com.google.common.collect.Maps; import java.nio.channels.FileLock;
import com.google.common.util.concurrent.Futures; import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -47,28 +65,11 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import java.io.File; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import com.google.common.collect.ComparisonChain;
import java.io.RandomAccessFile; import com.google.common.collect.Lists;
import java.nio.channels.FileLock; import com.google.common.collect.Maps;
import java.nio.channels.OverlappingFileLockException; import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/** /**
* Data storage information file. * Data storage information file.
@ -104,7 +105,7 @@ public class DataStorage extends Storage {
* upgraded from a pre-UUID version. For compatibility with prior * upgraded from a pre-UUID version. For compatibility with prior
* versions of Datanodes we cannot make this field a UUID. * versions of Datanodes we cannot make this field a UUID.
*/ */
private String datanodeUuid = null; private volatile String datanodeUuid = null;
// Maps block pool IDs to block pool storage // Maps block pool IDs to block pool storage
private final Map<String, BlockPoolSliceStorage> bpStorageMap private final Map<String, BlockPoolSliceStorage> bpStorageMap
@ -125,18 +126,28 @@ public class DataStorage extends Storage {
super(storageInfo); super(storageInfo);
} }
public synchronized String getDatanodeUuid() { public String getDatanodeUuid() {
return datanodeUuid; return datanodeUuid;
} }
public synchronized void setDatanodeUuid(String newDatanodeUuid) { public void setDatanodeUuid(String newDatanodeUuid) {
this.datanodeUuid = newDatanodeUuid; this.datanodeUuid = newDatanodeUuid;
} }
private static boolean createStorageID(StorageDirectory sd, int lv) {
// Clusters previously upgraded from layout versions earlier than
// ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
// new storage ID. We check for that and fix it now.
final boolean haveValidStorageId = DataNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv)
&& DatanodeStorage.isValidStorageId(sd.getStorageUuid());
return createStorageID(sd, !haveValidStorageId);
}
/** Create an ID for this storage. /** Create an ID for this storage.
* @return true if a new storage ID was generated. * @return true if a new storage ID was generated.
* */ * */
public synchronized boolean createStorageID( public static boolean createStorageID(
StorageDirectory sd, boolean regenerateStorageIds) { StorageDirectory sd, boolean regenerateStorageIds) {
final String oldStorageID = sd.getStorageUuid(); final String oldStorageID = sd.getStorageUuid();
if (oldStorageID == null || regenerateStorageIds) { if (oldStorageID == null || regenerateStorageIds) {
@ -250,7 +261,7 @@ public class DataStorage extends Storage {
private StorageDirectory loadStorageDirectory(DataNode datanode, private StorageDirectory loadStorageDirectory(DataNode datanode,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
throws IOException { throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false); StorageDirectory sd = new StorageDirectory(dataDir, null, false);
try { try {
StorageState curState = sd.analyzeStorage(startOpt, this); StorageState curState = sd.analyzeStorage(startOpt, this);
@ -263,9 +274,9 @@ public class DataStorage extends Storage {
throw new IOException("Storage directory " + dataDir throw new IOException("Storage directory " + dataDir
+ " does not exist"); + " does not exist");
case NOT_FORMATTED: // format case NOT_FORMATTED: // format
LOG.info("Storage directory " + dataDir + " is not formatted for " LOG.info("Storage directory " + dataDir
+ nsInfo.getBlockPoolID()); + " is not formatted for namespace " + nsInfo.getNamespaceID()
LOG.info("Formatting ..."); + ". Formatting...");
format(sd, nsInfo, datanode.getDatanodeUuid()); format(sd, nsInfo, datanode.getDatanodeUuid());
break; break;
default: // recovery part is common default: // recovery part is common
@ -276,7 +287,9 @@ public class DataStorage extends Storage {
// Each storage directory is treated individually. // Each storage directory is treated individually.
// During startup some of them can upgrade or roll back // During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup. // while others could be up-to-date for the regular startup.
doTransition(datanode, sd, nsInfo, startOpt); if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) {
return sd;
}
// 3. Update successfully loaded storage. // 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion()); setServiceLayoutVersion(getServiceLayoutVersion());
@ -321,20 +334,10 @@ public class DataStorage extends Storage {
nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT))); nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT)));
makeBlockPoolDataDir(bpDataDirs, null); makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage; final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final String bpid = nsInfo.getBlockPoolID(); final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
synchronized (this) { nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf());
bpStorage = this.bpStorageMap.get(bpid); builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
if (bpStorage == null) {
bpStorage = new BlockPoolSliceStorage(
nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
nsInfo.getClusterID());
addBlockPoolStorage(bpid, bpStorage);
}
}
builder.addBpStorageDirectories(
bpid, bpStorage.loadBpStorageDirectories(
datanode, nsInfo, bpDataDirs, StartupOption.HOTSWAP));
} }
return builder; return builder;
} }
@ -347,53 +350,68 @@ public class DataStorage extends Storage {
* @param nsInfo namespace information * @param nsInfo namespace information
* @param dataDirs array of data storage directories * @param dataDirs array of data storage directories
* @param startOpt startup option * @param startOpt startup option
* @return a list of successfully loaded volumes. * @return a list of successfully loaded storage directories.
* @throws IOException
*/ */
@VisibleForTesting @VisibleForTesting
synchronized List<StorageLocation> addStorageLocations(DataNode datanode, synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException { StartupOption startOpt) throws IOException {
final String bpid = nsInfo.getBlockPoolID(); final List<StorageLocation> successLocations = loadDataStorage(
List<StorageLocation> successVolumes = Lists.newArrayList(); datanode, nsInfo, dataDirs, startOpt);
return loadBlockPoolSliceStorage(
datanode, nsInfo, successLocations, startOpt);
}
private List<StorageLocation> loadDataStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException {
final List<StorageLocation> success = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) { for (StorageLocation dataDir : dataDirs) {
File root = dataDir.getFile(); File root = dataDir.getFile();
if (!containsStorageDir(root)) { if (!containsStorageDir(root)) {
try { try {
// It first ensures the datanode level format is completed. // It first ensures the datanode level format is completed.
StorageDirectory sd = loadStorageDirectory( final StorageDirectory sd = loadStorageDirectory(
datanode, nsInfo, root, startOpt); datanode, nsInfo, root, startOpt);
addStorageDir(sd); addStorageDir(sd);
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e); LOG.warn("Failed to add storage directory " + dataDir, e);
continue; continue;
} }
} else { } else {
LOG.info("Storage directory " + dataDir + " has already been used."); LOG.info("Storage directory " + dataDir + " has already been used.");
} }
success.add(dataDir);
}
return success;
}
private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException {
final String bpid = nsInfo.getBlockPoolID();
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> success = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) {
final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
List<File> bpDataDirs = new ArrayList<File>(); List<File> bpDataDirs = new ArrayList<File>();
bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, new File(root, bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, curDir));
STORAGE_DIR_CURRENT)));
try { try {
makeBlockPoolDataDir(bpDataDirs, null); makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
if (bpStorage == null) {
bpStorage = new BlockPoolSliceStorage(
nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
nsInfo.getClusterID());
}
bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
addBlockPoolStorage(bpid, bpStorage); nsInfo, bpDataDirs, startOpt, datanode.getConf());
for(StorageDirectory sd : dirs) {
success.add(sd);
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to add storage for block pool: " + bpid + " : " LOG.warn("Failed to add storage directory " + dataDir
+ e.getMessage()); + " for block pool " + bpid, e);
continue;
} }
successVolumes.add(dataDir);
} }
return successVolumes;
return success;
} }
/** /**
@ -635,17 +653,13 @@ public class DataStorage extends Storage {
* Upgrade if this.LV > LAYOUT_VERSION * Upgrade if this.LV > LAYOUT_VERSION
* Regular startup if this.LV = LAYOUT_VERSION * Regular startup if this.LV = LAYOUT_VERSION
* *
* @param datanode Datanode to which this storage belongs to
* @param sd storage directory * @param sd storage directory
* @param nsInfo namespace info * @param nsInfo namespace info
* @param startOpt startup option * @param startOpt startup option
* @throws IOException * @return true if the new properties has been written.
*/ */
private void doTransition( DataNode datanode, private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StorageDirectory sd, StartupOption startOpt, Configuration conf) throws IOException {
NamespaceInfo nsInfo,
StartupOption startOpt
) throws IOException {
if (startOpt == StartupOption.ROLLBACK) { if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable doRollback(sd, nsInfo); // rollback if applicable
} }
@ -674,25 +688,16 @@ public class DataStorage extends Storage {
+ nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID()); + nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID());
} }
// Clusters previously upgraded from layout versions earlier than
// ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
// new storage ID. We check for that and fix it now.
boolean haveValidStorageId =
DataNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, layoutVersion) &&
DatanodeStorage.isValidStorageId(sd.getStorageUuid());
// regular start up. // regular start up.
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) { if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
createStorageID(sd, !haveValidStorageId); createStorageID(sd, layoutVersion);
return; // regular startup return false; // need to write properties
} }
// do upgrade // do upgrade
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) { if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
doUpgrade(datanode, sd, nsInfo); // upgrade doUpgrade(sd, nsInfo, conf); // upgrade
createStorageID(sd, !haveValidStorageId); return true; // doUgrade already has written properties
return;
} }
// layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer // layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer
@ -726,8 +731,8 @@ public class DataStorage extends Storage {
* @param sd storage directory * @param sd storage directory
* @throws IOException on error * @throws IOException on error
*/ */
void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) void doUpgrade(final StorageDirectory sd, final NamespaceInfo nsInfo,
throws IOException { final Configuration conf) throws IOException {
// If the existing on-disk layout version supportes federation, simply // If the existing on-disk layout version supportes federation, simply
// update its layout version. // update its layout version.
if (DataNodeLayoutVersion.supports( if (DataNodeLayoutVersion.supports(
@ -743,15 +748,16 @@ public class DataStorage extends Storage {
return; return;
} }
final int oldLV = getLayoutVersion();
LOG.info("Upgrading storage directory " + sd.getRoot() LOG.info("Upgrading storage directory " + sd.getRoot()
+ ".\n old LV = " + this.getLayoutVersion() + ".\n old LV = " + oldLV
+ "; old CTime = " + this.getCTime() + "; old CTime = " + this.getCTime()
+ ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION + ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
+ "; new CTime = " + nsInfo.getCTime()); + "; new CTime = " + nsInfo.getCTime());
File curDir = sd.getCurrentDir(); final File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir(); final File prevDir = sd.getPreviousDir();
File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW); final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
assert curDir.exists() : "Data node current directory must exist."; assert curDir.exists() : "Data node current directory must exist.";
// Cleanup directory "detach" // Cleanup directory "detach"
@ -761,20 +767,28 @@ public class DataStorage extends Storage {
if (prevDir.exists()) if (prevDir.exists())
deleteDir(prevDir); deleteDir(prevDir);
// get previous.tmp directory, <SD>/previous.tmp // get previous.tmp directory, <SD>/previous.tmp
File tmpDir = sd.getPreviousTmp(); final File tmpDir = sd.getPreviousTmp();
assert !tmpDir.exists() : assert !tmpDir.exists() :
"Data node previous.tmp directory must not exist."; "Data node previous.tmp directory must not exist.";
// 2. Rename <SD>/current to <SD>/previous.tmp // 2. Rename <SD>/current to <SD>/previous.tmp
rename(curDir, tmpDir); rename(curDir, tmpDir);
// 3. Format BP and hard link blocks from previous directory // 3.1. Format BP
File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir); File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir);
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(curDir, nsInfo); bpStorage.format(curDir, nsInfo);
linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
STORAGE_DIR_CURRENT)); final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
}
private void doUgrade(final StorageDirectory sd,
final NamespaceInfo nsInfo, final File prevDir,
final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
Configuration conf) throws IOException {
// 3.2. Link block files from <SD>/previous.tmp to <SD>/current
linkAllBlocks(tmpDir, bbwDir, toDir, oldLV, conf);
// 4. Write version file under <SD>/current // 4. Write version file under <SD>/current
layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION; layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
@ -784,7 +798,8 @@ public class DataStorage extends Storage {
// 5. Rename <SD>/previous.tmp to <SD>/previous // 5. Rename <SD>/previous.tmp to <SD>/previous
rename(tmpDir, prevDir); rename(tmpDir, prevDir);
LOG.info("Upgrade of " + sd.getRoot()+ " is complete"); LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
addBlockPoolStorage(nsInfo.getBlockPoolID(), bpStorage);
createStorageID(sd, layoutVersion);
} }
/** /**
@ -952,23 +967,22 @@ public class DataStorage extends Storage {
* *
* @throws IOException If error occurs during hardlink * @throws IOException If error occurs during hardlink
*/ */
private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir, private static void linkAllBlocks(File fromDir, File fromBbwDir, File toDir,
File toDir) throws IOException { int diskLayoutVersion, Configuration conf) throws IOException {
HardLink hardLink = new HardLink(); HardLink hardLink = new HardLink();
// do the link // do the link
int diskLayoutVersion = this.getLayoutVersion();
if (DataNodeLayoutVersion.supports( if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) { LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
// hardlink finalized blocks in tmpDir/finalized // hardlink finalized blocks in tmpDir/finalized
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED), linkBlocks(fromDir, toDir, STORAGE_DIR_FINALIZED,
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); diskLayoutVersion, hardLink, conf);
// hardlink rbw blocks in tmpDir/rbw // hardlink rbw blocks in tmpDir/rbw
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW), linkBlocks(fromDir, toDir, STORAGE_DIR_RBW,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); diskLayoutVersion, hardLink, conf);
} else { // pre-RBW version } else { // pre-RBW version
// hardlink finalized blocks in tmpDir // hardlink finalized blocks in tmpDir
linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED), linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink); diskLayoutVersion, hardLink, conf);
if (fromBbwDir.exists()) { if (fromBbwDir.exists()) {
/* /*
* We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw * We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
@ -976,11 +990,12 @@ public class DataStorage extends Storage {
* NOT underneath the 'current' directory in those releases. See * NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details. * HDFS-3731 for details.
*/ */
linkBlocks(datanode, fromBbwDir, linkBlocks(fromBbwDir, new File(toDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); diskLayoutVersion, hardLink, conf);
} }
} }
LOG.info( hardLink.linkStats.report() ); LOG.info("Linked blocks from " + fromDir + " to " + toDir + ". "
+ hardLink.linkStats.report());
} }
private static class LinkArgs { private static class LinkArgs {
@ -993,8 +1008,15 @@ public class DataStorage extends Storage {
} }
} }
static void linkBlocks(DataNode datanode, File from, File to, int oldLV, static void linkBlocks(File fromDir, File toDir, String subdir, int oldLV,
HardLink hl) throws IOException { HardLink hl, Configuration conf) throws IOException {
linkBlocks(new File(fromDir, subdir), new File(toDir, subdir),
oldLV, hl, conf);
}
private static void linkBlocks(File from, File to, int oldLV,
HardLink hl, Configuration conf) throws IOException {
LOG.info("Start linking block files from " + from + " to " + to);
boolean upgradeToIdBasedLayout = false; boolean upgradeToIdBasedLayout = false;
// If we are upgrading from a version older than the one where we introduced // If we are upgrading from a version older than the one where we introduced
// block ID-based layout AND we're working with the finalized directory, // block ID-based layout AND we're working with the finalized directory,
@ -1017,7 +1039,7 @@ public class DataStorage extends Storage {
removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates); removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates);
} }
int numLinkWorkers = datanode.getConf().getInt( final int numLinkWorkers = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY, DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS); DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers); ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
@ -1268,13 +1290,19 @@ public class DataStorage extends Storage {
} }
/** /**
* Add bpStorage into bpStorageMap * Get the BlockPoolSliceStorage from {@link bpStorageMap}.
* If the object is not found, create a new object and put it to the map.
*/ */
private void addBlockPoolStorage(String bpID, BlockPoolSliceStorage bpStorage synchronized BlockPoolSliceStorage getBlockPoolSliceStorage(
) { final NamespaceInfo nsInfo) {
if (!this.bpStorageMap.containsKey(bpID)) { final String bpid = nsInfo.getBlockPoolID();
this.bpStorageMap.put(bpID, bpStorage); BlockPoolSliceStorage bpStorage = bpStorageMap.get(bpid);
if (bpStorage == null) {
bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), bpid,
nsInfo.getCTime(), nsInfo.getClusterID());
bpStorageMap.put(bpid, bpStorage);
} }
return bpStorage;
} }
synchronized void removeBlockPoolStorage(String bpId) { synchronized void removeBlockPoolStorage(String bpId) {

View File

@ -101,4 +101,19 @@ public class StorageLocation {
public String toString() { public String toString() {
return "[" + storageType + "]" + file.toURI(); return "[" + storageType + "]" + file.toURI();
} }
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj == null || !(obj instanceof StorageLocation)) {
return false;
}
return toString().equals(obj.toString());
}
@Override
public int hashCode() {
return toString().hashCode();
}
} }

View File

@ -508,7 +508,8 @@ public class TestReplication {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong( conf.setLong(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1); DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.storagesPerDatanode(1).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/test"); Path filePath = new Path("/test");
FSDataOutputStream create = fs.create(filePath); FSDataOutputStream create = fs.create(filePath);

View File

@ -477,7 +477,7 @@ public class UpgradeUtilities {
for (int i = 0; i < parent.length; i++) { for (int i = 0; i < parent.length; i++) {
File versionFile = new File(parent[i], "VERSION"); File versionFile = new File(parent[i], "VERSION");
StorageDirectory sd = new StorageDirectory(parent[i].getParentFile()); StorageDirectory sd = new StorageDirectory(parent[i].getParentFile());
storage.createStorageID(sd, false); DataStorage.createStorageID(sd, false);
storage.writeProperties(versionFile, sd); storage.writeProperties(versionFile, sd);
versionFiles[i] = versionFile; versionFiles[i] = versionFile;
File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]); File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);

View File

@ -537,7 +537,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
this.datanode = datanode; this.datanode = datanode;
if (storage != null) { if (storage != null) {
for (int i = 0; i < storage.getNumStorageDirs(); ++i) { for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
storage.createStorageID(storage.getStorageDir(i), false); DataStorage.createStorageID(storage.getStorageDir(i), false);
} }
this.datanodeUuid = storage.getDatanodeUuid(); this.datanodeUuid = storage.getDatanodeUuid();
} else { } else {

View File

@ -51,10 +51,12 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -288,15 +290,27 @@ public class TestDataNodeHotSwapVolumes {
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(","); String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
String[] expectDataDirs = newDataDir.split(","); String[] expectDataDirs = newDataDir.split(",");
assertEquals(expectDataDirs.length, effectiveDataDirs.length); assertEquals(expectDataDirs.length, effectiveDataDirs.length);
List<StorageLocation> expectedStorageLocations = new ArrayList<>();
List<StorageLocation> effectiveStorageLocations = new ArrayList<>();
for (int i = 0; i < expectDataDirs.length; i++) { for (int i = 0; i < expectDataDirs.length; i++) {
StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]); StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]);
StorageLocation effectiveLocation = StorageLocation effectiveLocation = StorageLocation
StorageLocation.parse(effectiveDataDirs[i]); .parse(effectiveDataDirs[i]);
assertEquals(expectLocation.getStorageType(), expectedStorageLocations.add(expectLocation);
effectiveLocation.getStorageType()); effectiveStorageLocations.add(effectiveLocation);
assertEquals(expectLocation.getFile().getCanonicalFile(),
effectiveLocation.getFile().getCanonicalFile());
} }
Comparator<StorageLocation> comparator = new Comparator<StorageLocation>() {
@Override
public int compare(StorageLocation o1, StorageLocation o2) {
return o1.toString().compareTo(o2.toString());
}
};
Collections.sort(expectedStorageLocations, comparator);
Collections.sort(effectiveStorageLocations, comparator);
assertEquals("Effective volumes doesnt match expected",
expectedStorageLocations, effectiveStorageLocations);
// Check that all newly created volumes are appropriately formatted. // Check that all newly created volumes are appropriately formatted.
for (File volumeDir : newVolumeDirs) { for (File volumeDir : newVolumeDirs) {
@ -473,11 +487,27 @@ public class TestDataNodeHotSwapVolumes {
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
Collection<String> oldDirs = getDataDirs(dn); Collection<String> oldDirs = getDataDirs(dn);
String newDirs = oldDirs.iterator().next(); // Keep the first volume. // Findout the storage with block and remove it
ExtendedBlock block =
DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock();
FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block);
String basePath = volumeWithBlock.getBasePath();
File storageDir = new File(basePath);
URI fileUri = storageDir.toURI();
String dirWithBlock =
"[" + volumeWithBlock.getStorageType() + "]" + fileUri;
String newDirs = dirWithBlock;
for (String dir : oldDirs) {
if (dirWithBlock.startsWith(dir)) {
continue;
}
newDirs = dir;
break;
}
dn.reconfigurePropertyImpl( dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
assertFileLocksReleased( oldDirs.remove(newDirs);
new ArrayList<String>(oldDirs).subList(1, oldDirs.size())); assertFileLocksReleased(oldDirs);
triggerDeleteReport(dn); triggerDeleteReport(dn);

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
@ -54,11 +57,13 @@ public class TestDataStorage {
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
Configuration conf = new HdfsConfiguration();
storage = new DataStorage(); storage = new DataStorage();
nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME, nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME,
BUILD_VERSION, SOFTWARE_VERSION); BUILD_VERSION, SOFTWARE_VERSION);
FileUtil.fullyDelete(TEST_DIR); FileUtil.fullyDelete(TEST_DIR);
assertTrue("Failed to make test dir.", TEST_DIR.mkdirs()); assertTrue("Failed to make test dir.", TEST_DIR.mkdirs());
Mockito.when(mockDN.getConf()).thenReturn(conf);
} }
@After @After
@ -146,7 +151,7 @@ public class TestDataStorage {
assertEquals(numLocations, storage.getNumStorageDirs()); assertEquals(numLocations, storage.getNumStorageDirs());
locations = createStorageLocations(numLocations); locations = createStorageLocations(numLocations);
List<StorageLocation> addedLocation = List<StorageDirectory> addedLocation =
storage.addStorageLocations(mockDN, namespaceInfos.get(0), storage.addStorageLocations(mockDN, namespaceInfos.get(0),
locations, START_OPT); locations, START_OPT);
assertTrue(addedLocation.isEmpty()); assertTrue(addedLocation.isEmpty());

View File

@ -110,7 +110,7 @@ public class TestFsDatasetImpl {
private static Storage.StorageDirectory createStorageDirectory(File root) { private static Storage.StorageDirectory createStorageDirectory(File root) {
Storage.StorageDirectory sd = new Storage.StorageDirectory(root); Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
dsForStorageUuid.createStorageID(sd, false); DataStorage.createStorageID(sd, false);
return sd; return sd;
} }