HDFS-7035. Make adding a new data directory to the DataNode an atomic operation and improve error handling (Lei Xu via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2014-10-30 17:31:23 -07:00
parent 348bfb7d59
commit a9331fe9b0
12 changed files with 587 additions and 447 deletions

View File

@ -322,6 +322,9 @@ Release 2.7.0 - UNRELEASED
HDFS-3342. SocketTimeoutException in BlockSender.sendChunks could
have a better error message. (Yongjun Zhang via wang)
HDFS-7035. Make adding a new data directory to the DataNode an atomic
operation and improve error handling (Lei Xu via Colin P. McCabe)
OPTIMIZATIONS
BUG FIXES

View File

@ -820,6 +820,21 @@ public abstract class Storage extends StorageInfo {
storageDirs.add(sd);
}
/**
* Returns true if the storage directory on the given directory is already
* loaded.
* @param root the root directory of a {@link StorageDirectory}
* @throws IOException if failed to get canonical path.
*/
protected boolean containsStorageDir(File root) throws IOException {
for (StorageDirectory sd : storageDirs) {
if (sd.getRoot().getCanonicalPath().equals(root.getCanonicalPath())) {
return true;
}
}
return false;
}
/**
* Return true if the layout of the given storage directory is from a version
* of Hadoop prior to the introduction of the "current" and "previous"

View File

@ -217,6 +217,10 @@ public class StorageInfo {
namespaceID = nsId;
}
public void setServiceLayoutVersion(int lv) {
this.layoutVersion = lv;
}
public int getServiceLayoutVersion() {
return storageType == NodeType.DATA_NODE ? HdfsConstants.DATANODE_LAYOUT_VERSION
: HdfsConstants.NAMENODE_LAYOUT_VERSION;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
@ -35,9 +36,7 @@ import org.apache.hadoop.util.Daemon;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@ -127,9 +126,110 @@ public class BlockPoolSliceStorage extends Storage {
new ConcurrentHashMap<String, Boolean>());
}
// Expose visibility for VolumeBuilder#commit().
public void addStorageDir(StorageDirectory sd) {
super.addStorageDir(sd);
}
/**
* Load one storage directory. Recover from previous transitions if required.
*
* @param datanode datanode instance
* @param nsInfo namespace information
* @param dataDir the root path of the storage directory
* @param startOpt startup option
* @return the StorageDirectory successfully loaded.
* @throws IOException
*/
private StorageDirectory loadStorageDirectory(DataNode datanode,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
try {
StorageState curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
LOG.info("Block pool storage directory " + dataDir + " does not exist");
throw new IOException("Storage directory " + dataDir
+ " does not exist");
case NOT_FORMATTED: // format
LOG.info("Block pool storage directory " + dataDir
+ " is not formatted for " + nsInfo.getBlockPoolID());
LOG.info("Formatting ...");
format(sd, nsInfo);
break;
default: // recovery part is common
sd.doRecover(curState);
}
// 2. Do transitions
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
doTransition(datanode, sd, nsInfo, startOpt);
if (getCTime() != nsInfo.getCTime()) {
throw new IOException(
"Data-node and name-node CTimes must be the same.");
}
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
return sd;
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
}
/**
* Analyze and load storage directories. Recover from previous transitions if
* required.
*
* The block pool storages are either all analyzed or none of them is loaded.
* Therefore, a failure on loading any block pool storage results a faulty
* data volume.
*
* @param datanode Datanode to which this storage belongs to
* @param nsInfo namespace information
* @param dataDirs storage directories of block pool
* @param startOpt startup option
* @return an array of loaded block pool directories.
* @throws IOException on error
*/
List<StorageDirectory> loadBpStorageDirectories(
DataNode datanode, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList();
try {
for (File dataDir : dataDirs) {
if (containsStorageDir(dataDir)) {
throw new IOException(
"BlockPoolSliceStorage.recoverTransitionRead: " +
"attempt to load an used block storage: " + dataDir);
}
StorageDirectory sd =
loadStorageDirectory(datanode, nsInfo, dataDir, startOpt);
succeedDirs.add(sd);
}
} catch (IOException e) {
LOG.warn("Failed to analyze storage directories for block pool "
+ nsInfo.getBlockPoolID(), e);
throw e;
}
return succeedDirs;
}
/**
* Analyze storage directories. Recover from previous transitions if required.
*
* The block pool storages are either all analyzed or none of them is loaded.
* Therefore, a failure on loading any block pool storage results a faulty
* data volume.
*
* @param datanode Datanode to which this storage belongs to
* @param nsInfo namespace information
* @param dataDirs storage directories of block pool
@ -139,68 +239,10 @@ public class BlockPoolSliceStorage extends Storage {
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
Set<String> existingStorageDirs = new HashSet<String>();
for (int i = 0; i < getNumStorageDirs(); i++) {
existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
}
// 1. For each BP data directory analyze the state and
// check whether all is consistent before transitioning.
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
dataDirs.size());
for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
LOG.info("Storage directory " + dataDir + " has already been used.");
it.remove();
continue;
}
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
// ignore this storage
LOG.info("Storage directory " + dataDir + " does not exist.");
it.remove();
continue;
case NOT_FORMATTED: // format
LOG.info("Storage directory " + dataDir + " is not formatted.");
LOG.info("Formatting ...");
format(sd, nsInfo);
break;
default: // recovery part is common
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
// add to the storage list. This is inherited from parent class, Storage.
for (StorageDirectory sd : loadBpStorageDirectories(
datanode, nsInfo, dataDirs, startOpt)) {
addStorageDir(sd);
dataDirStates.add(curState);
}
if (dataDirs.size() == 0) // none of the data dirs exist
throw new IOException(
"All specified directories are not accessible or do not exist.");
// 2. Do transitions
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
assert getCTime() == nsInfo.getCTime()
: "Data-node and name-node CTimes must be the same.";
}
// 3. Update all storages. Some of them might have just been formatted.
this.writeAll();
}
/**

View File

@ -44,12 +44,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.BufferedOutputStream;
@ -81,6 +77,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
@ -182,7 +182,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@ -451,20 +450,27 @@ public class DataNode extends ReconfigurableBase
*/
@VisibleForTesting
static class ChangedVolumes {
/** The storage locations of the newly added volumes. */
List<StorageLocation> newLocations = Lists.newArrayList();
/** The storage locations of the volumes that are removed. */
List<StorageLocation> deactivateLocations = Lists.newArrayList();
/** The unchanged locations that existed in the old configuration. */
List<StorageLocation> unchangedLocations = Lists.newArrayList();
}
/**
* Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect
* changed volumes.
* @param newVolumes a comma separated string that specifies the data volumes.
* @return changed volumes.
* @throws IOException if none of the directories are specified in the
* configuration.
*/
@VisibleForTesting
ChangedVolumes parseChangedVolumes() throws IOException {
List<StorageLocation> locations = getStorageLocations(getConf());
ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
Configuration conf = new Configuration();
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
List<StorageLocation> locations = getStorageLocations(conf);
if (locations.isEmpty()) {
throw new IOException("No directory is specified.");
@ -479,9 +485,11 @@ public class DataNode extends ReconfigurableBase
boolean found = false;
for (Iterator<StorageLocation> sl = results.newLocations.iterator();
sl.hasNext(); ) {
if (sl.next().getFile().getCanonicalPath().equals(
StorageLocation location = sl.next();
if (location.getFile().getCanonicalPath().equals(
dir.getRoot().getCanonicalPath())) {
sl.remove();
results.unchangedLocations.add(location);
found = true;
break;
}
@ -499,18 +507,21 @@ public class DataNode extends ReconfigurableBase
/**
* Attempts to reload data volumes with new configuration.
* @param newVolumes a comma separated string that specifies the data volumes.
* @throws Exception
* @throws IOException on error. If an IOException is thrown, some new volumes
* may have been successfully added and removed.
*/
private synchronized void refreshVolumes(String newVolumes) throws Exception {
private synchronized void refreshVolumes(String newVolumes) throws IOException {
Configuration conf = getConf();
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
List<StorageLocation> locations = getStorageLocations(conf);
final int numOldDataDirs = dataDirs.size();
dataDirs = locations;
ChangedVolumes changedVolumes = parseChangedVolumes();
int numOldDataDirs = dataDirs.size();
ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes);
StringBuilder errorMessageBuilder = new StringBuilder();
List<String> effectiveVolumes = Lists.newArrayList();
for (StorageLocation sl : changedVolumes.unchangedLocations) {
effectiveVolumes.add(sl.toString());
}
try {
if (numOldDataDirs + changedVolumes.newLocations.size() -
changedVolumes.deactivateLocations.size() <= 0) {
@ -521,34 +532,43 @@ public class DataNode extends ReconfigurableBase
Joiner.on(",").join(changedVolumes.newLocations));
// Add volumes for each Namespace
final List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
LOG.info("Loading volumes for namesapce: " + nsInfo.getNamespaceID());
storage.addStorageLocations(
this, nsInfo, changedVolumes.newLocations, StartupOption.HOTSWAP);
nsInfos.add(bpos.getNamespaceInfo());
}
List<String> bpids = Lists.newArrayList();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
bpids.add(bpos.getBlockPoolId());
ExecutorService service = Executors.newFixedThreadPool(
changedVolumes.newLocations.size());
List<Future<IOException>> exceptions = Lists.newArrayList();
for (final StorageLocation location : changedVolumes.newLocations) {
exceptions.add(service.submit(new Callable<IOException>() {
@Override
public IOException call() {
try {
data.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
return null;
}
}));
}
List<StorageLocation> succeedVolumes =
data.addVolumes(changedVolumes.newLocations, bpids);
if (succeedVolumes.size() < changedVolumes.newLocations.size()) {
List<StorageLocation> failedVolumes = Lists.newArrayList();
// Clean all failed volumes.
for (StorageLocation location : changedVolumes.newLocations) {
if (!succeedVolumes.contains(location)) {
errorMessageBuilder.append("FAILED TO ADD:");
failedVolumes.add(location);
for (int i = 0; i < changedVolumes.newLocations.size(); i++) {
StorageLocation volume = changedVolumes.newLocations.get(i);
Future<IOException> ioExceptionFuture = exceptions.get(i);
try {
IOException ioe = ioExceptionFuture.get();
if (ioe != null) {
errorMessageBuilder.append(String.format("FAILED TO ADD: %s: %s\n",
volume.toString(), ioe.getMessage()));
} else {
errorMessageBuilder.append("ADDED:");
effectiveVolumes.add(volume.toString());
}
errorMessageBuilder.append(location);
errorMessageBuilder.append("\n");
LOG.info("Storage directory is loaded: " + volume.toString());
} catch (Exception e) {
errorMessageBuilder.append(String.format("FAILED to ADD: %s: %s\n",
volume.toString(), e.getMessage()));
}
storage.removeVolumes(failedVolumes);
data.removeVolumes(failedVolumes);
}
}
@ -557,15 +577,20 @@ public class DataNode extends ReconfigurableBase
Joiner.on(",").join(changedVolumes.deactivateLocations));
data.removeVolumes(changedVolumes.deactivateLocations);
try {
storage.removeVolumes(changedVolumes.deactivateLocations);
} catch (IOException e) {
errorMessageBuilder.append(e.getMessage());
}
}
if (errorMessageBuilder.length() > 0) {
throw new IOException(errorMessageBuilder.toString());
}
} catch (IOException e) {
LOG.warn("There is IOException when refresh volumes! ", e);
throw e;
} finally {
conf.set(DFS_DATANODE_DATA_DIR_KEY,
Joiner.on(",").join(effectiveVolumes));
dataDirs = getStorageLocations(conf);
}
}
@ -1304,7 +1329,7 @@ public class DataNode extends ReconfigurableBase
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
synchronized (this) {
storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
}
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@ -190,13 +193,145 @@ public class DataStorage extends Storage {
}
/**
* {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
* VolumeBuilder holds the metadata (e.g., the storage directories) of the
* prepared volume returned from {@link prepareVolume()}. Calling {@link build()}
* to add the metadata to {@link DataStorage} so that this prepared volume can
* be active.
*/
private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
this.layoutVersion = getServiceLayoutVersion();
for (StorageDirectory dir : dirs) {
writeProperties(dir);
@InterfaceAudience.Private
@InterfaceStability.Unstable
static public class VolumeBuilder {
private DataStorage storage;
/** Volume level storage directory. */
private StorageDirectory sd;
/** Mapping from block pool ID to an array of storage directories. */
private Map<String, List<StorageDirectory>> bpStorageDirMap =
Maps.newHashMap();
@VisibleForTesting
public VolumeBuilder(DataStorage storage, StorageDirectory sd) {
this.storage = storage;
this.sd = sd;
}
public final StorageDirectory getStorageDirectory() {
return this.sd;
}
private void addBpStorageDirectories(String bpid,
List<StorageDirectory> dirs) {
bpStorageDirMap.put(bpid, dirs);
}
/**
* Add loaded metadata of a data volume to {@link DataStorage}.
*/
public void build() {
assert this.sd != null;
synchronized (storage) {
for (Map.Entry<String, List<StorageDirectory>> e :
bpStorageDirMap.entrySet()) {
final String bpid = e.getKey();
BlockPoolSliceStorage bpStorage = this.storage.bpStorageMap.get(bpid);
assert bpStorage != null;
for (StorageDirectory bpSd : e.getValue()) {
bpStorage.addStorageDir(bpSd);
}
}
storage.addStorageDir(sd);
}
}
}
private StorageDirectory loadStorageDirectory(DataNode datanode,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
try {
StorageState curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
LOG.info("Storage directory " + dataDir + " does not exist");
throw new IOException("Storage directory " + dataDir
+ " does not exist");
case NOT_FORMATTED: // format
LOG.info("Storage directory " + dataDir + " is not formatted for "
+ nsInfo.getBlockPoolID());
LOG.info("Formatting ...");
format(sd, nsInfo, datanode.getDatanodeUuid());
break;
default: // recovery part is common
sd.doRecover(curState);
}
// 2. Do transitions
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
doTransition(datanode, sd, nsInfo, startOpt);
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
return sd;
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
}
/**
* Prepare a storage directory. It creates a builder which can be used to add
* to the volume. If the volume cannot be added, it is OK to discard the
* builder later.
*
* @param datanode DataNode object.
* @param volume the root path of a storage directory.
* @param nsInfos an array of namespace infos.
* @return a VolumeBuilder that holds the metadata of this storage directory
* and can be added to DataStorage later.
* @throws IOException if encounters I/O errors.
*
* Note that if there is IOException, the state of DataStorage is not modified.
*/
public VolumeBuilder prepareVolume(DataNode datanode, File volume,
List<NamespaceInfo> nsInfos) throws IOException {
if (containsStorageDir(volume)) {
final String errorMessage = "Storage directory is in use";
LOG.warn(errorMessage + ".");
throw new IOException(errorMessage);
}
StorageDirectory sd = loadStorageDirectory(
datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP);
VolumeBuilder builder =
new VolumeBuilder(this, sd);
for (NamespaceInfo nsInfo : nsInfos) {
List<File> bpDataDirs = Lists.newArrayList();
bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(
nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT)));
makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage;
final String bpid = nsInfo.getBlockPoolID();
synchronized (this) {
bpStorage = this.bpStorageMap.get(bpid);
if (bpStorage == null) {
bpStorage = new BlockPoolSliceStorage(
nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
nsInfo.getClusterID());
addBlockPoolStorage(bpid, bpStorage);
}
}
builder.addBpStorageDirectories(
bpid, bpStorage.loadBpStorageDirectories(
datanode, nsInfo, bpDataDirs, StartupOption.HOTSWAP));
}
return builder;
}
/**
@ -207,27 +342,35 @@ public class DataStorage extends Storage {
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
* @return a list of successfully loaded volumes.
* @throws IOException
*/
synchronized void addStorageLocations(DataNode datanode,
@VisibleForTesting
synchronized List<StorageLocation> addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt)
throws IOException {
// Similar to recoverTransitionRead, it first ensures the datanode level
// format is completed.
List<StorageLocation> tmpDataDirs =
new ArrayList<StorageLocation>(dataDirs);
addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
Collection<File> bpDataDirs = new ArrayList<File>();
String bpid = nsInfo.getBlockPoolID();
for (StorageLocation dir : dataDirs) {
File dnRoot = dir.getFile();
File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
STORAGE_DIR_CURRENT));
bpDataDirs.add(bpRoot);
StartupOption startOpt) throws IOException {
final String bpid = nsInfo.getBlockPoolID();
List<StorageLocation> successVolumes = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) {
File root = dataDir.getFile();
if (!containsStorageDir(root)) {
try {
// It first ensures the datanode level format is completed.
StorageDirectory sd = loadStorageDirectory(
datanode, nsInfo, root, startOpt);
addStorageDir(sd);
} catch (IOException e) {
LOG.warn(e);
continue;
}
// mkdir for the list of BlockPoolStorage
} else {
LOG.info("Storage directory " + dataDir + " has already been used.");
}
List<File> bpDataDirs = new ArrayList<File>();
bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, new File(root,
STORAGE_DIR_CURRENT)));
try {
makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
if (bpStorage == null) {
@ -238,126 +381,25 @@ public class DataStorage extends Storage {
bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
addBlockPoolStorage(bpid, bpStorage);
}
/**
* Add a list of volumes to be managed by this DataStorage. If the volume is
* empty, it formats the volume, otherwise it recovers it from previous
* transitions if required.
*
* If isInitialize is false, only the directories that have finished the
* doTransition() process will be added into DataStorage.
*
* @param datanode the reference to DataNode.
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
* @param isInitialize whether it is called when DataNode starts up.
* @throws IOException
*/
private synchronized void addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
throws IOException {
Set<String> existingStorageDirs = new HashSet<String>();
for (int i = 0; i < getNumStorageDirs(); i++) {
existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
}
// 1. For each data directory calculate its state and check whether all is
// consistent before transitioning. Format and recover.
ArrayList<StorageState> dataDirStates =
new ArrayList<StorageState>(dataDirs.size());
List<StorageDirectory> addedStorageDirectories =
new ArrayList<StorageDirectory>();
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next().getFile();
if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
LOG.info("Storage directory " + dataDir + " has already been used.");
it.remove();
continue;
}
StorageDirectory sd = new StorageDirectory(dataDir);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
// ignore this storage
LOG.info("Storage directory " + dataDir + " does not exist");
it.remove();
continue;
case NOT_FORMATTED: // format
LOG.info("Storage directory " + dataDir + " is not formatted for "
+ nsInfo.getBlockPoolID());
LOG.info("Formatting ...");
format(sd, nsInfo, datanode.getDatanodeUuid());
break;
default: // recovery part is common
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
LOG.warn("Ignoring storage directory " + dataDir
+ " due to an exception", ioe);
//continue with other good dirs
continue;
}
if (isInitialize) {
addStorageDir(sd);
}
addedStorageDirectories.add(sd);
dataDirStates.add(curState);
}
if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
// none of the data dirs exist
if (ignoreExistingDirs) {
return;
}
throw new IOException(
"All specified directories are not accessible or do not exist.");
}
// 2. Do transitions
// Each storage directory is treated individually.
// During startup some of them can upgrade or rollback
// while others could be up-to-date for the regular startup.
for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
try {
doTransition(datanode, sd, nsInfo, startOpt);
createStorageID(sd);
} catch (IOException e) {
if (!isInitialize) {
sd.unlock();
it.remove();
LOG.warn("Failed to add storage for block pool: " + bpid + " : "
+ e.getMessage());
continue;
}
unlockAll();
throw e;
}
}
// 3. Update all successfully loaded storages. Some of them might have just
// been formatted.
this.writeAll(addedStorageDirectories);
// 4. Make newly loaded storage directories visible for service.
if (!isInitialize) {
this.storageDirs.addAll(addedStorageDirectories);
successVolumes.add(dataDir);
}
return successVolumes;
}
/**
* Remove volumes from DataStorage.
* Remove volumes from DataStorage. All volumes are removed even when the
* IOException is thrown.
*
* @param locations a collection of volumes.
* @throws IOException if I/O error when unlocking storage directory.
*/
synchronized void removeVolumes(Collection<StorageLocation> locations) {
synchronized void removeVolumes(Collection<StorageLocation> locations)
throws IOException {
if (locations.isEmpty()) {
return;
}
@ -371,6 +413,7 @@ public class DataStorage extends Storage {
bpsStorage.removeVolumes(dataDirs);
}
StringBuilder errorMsgBuilder = new StringBuilder();
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
@ -382,13 +425,18 @@ public class DataStorage extends Storage {
LOG.warn(String.format(
"I/O error attempting to unlock storage directory %s.",
sd.getRoot()), e);
errorMsgBuilder.append(String.format("Failed to remove %s: %s\n",
sd.getRoot(), e.getMessage()));
}
}
}
if (errorMsgBuilder.length() > 0) {
throw new IOException(errorMsgBuilder.toString());
}
}
/**
* Analyze storage directories.
* Analyze storage directories for a specific block pool.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
@ -396,60 +444,25 @@ public class DataStorage extends Storage {
* This method should be synchronized between multiple DN threads. Only the
* first DN thread does DN level storage dir recoverTransitionRead.
*
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
* @throws IOException
*/
synchronized void recoverTransitionRead(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt)
throws IOException {
if (initialized) {
// DN storage has been initialized, no need to do anything
return;
}
LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ " and NameNode layout version: " + nsInfo.getLayoutVersion());
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
// mark DN storage is initialized
this.initialized = true;
}
/**
* recoverTransitionRead for a specific block pool
*
* @param datanode DataNode
* @param bpID Block pool Id
* @param nsInfo Namespace info of namenode corresponding to the block pool
* @param dataDirs Storage directories
* @param startOpt startup option
* @throws IOException on error
*/
void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
// First ensure datanode level format/snapshot/rollback is completed
recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
// Create list of storage directories for the block pool
Collection<File> bpDataDirs = new ArrayList<File>();
for(StorageLocation dir : dataDirs) {
File dnRoot = dir.getFile();
File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
STORAGE_DIR_CURRENT));
bpDataDirs.add(bpRoot);
if (this.initialized) {
LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ " and NameNode layout version: " + nsInfo.getLayoutVersion());
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
// mark DN storage is initialized
this.initialized = true;
}
// mkdir for the list of BlockPoolStorage
makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
addBlockPoolStorage(bpID, bpStorage);
if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
throw new IOException("All specified directories are failed to load.");
}
}
/**
@ -665,12 +678,15 @@ public class DataStorage extends Storage {
// meaningful at BlockPoolSliceStorage level.
// regular start up.
if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION)
if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION) {
createStorageID(sd);
return; // regular startup
}
// do upgrade
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
doUpgrade(datanode, sd, nsInfo); // upgrade
createStorageID(sd);
return;
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -100,8 +101,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public List<V> getVolumes();
/** Add an array of StorageLocation to FsDataset. */
public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
final Collection<String> bpids);
public void addVolume(
final StorageLocation location,
final List<NamespaceInfo> nsInfos) throws IOException;
/** Removes a collection of volumes from FsDataset. */
public void removeVolumes(Collection<StorageLocation> volumes);

View File

@ -31,7 +31,6 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -94,6 +93,7 @@ import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskRepli
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
@ -307,30 +307,39 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
synchronized (this) {
volumeMap.addAll(tempVolumeMap);
volumes.addVolume(fsVolume);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(fsVolume);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
private void addVolumeAndBlockPool(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd, final Collection<String> bpids)
@Override
public void addVolume(final StorageLocation location,
final List<NamespaceInfo> nsInfos)
throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
final File dir = location.getFile();
// Prepare volume in DataStorage
DataStorage.VolumeBuilder builder =
dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
final Storage.StorageDirectory sd = builder.getStorageDirectory();
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
ArrayList<IOException> exceptions = Lists.newArrayList();
List<IOException> exceptions = Lists.newArrayList();
for (final String bpid : bpids) {
for (final NamespaceInfo nsInfo : nsInfos) {
String bpid = nsInfo.getBlockPoolID();
try {
fsVolume.addBlockPool(bpid, this.conf);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
@ -341,10 +350,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
if (!exceptions.isEmpty()) {
// The states of FsDatasteImpl are not modified, thus no need to rolled back.
throw MultipleIOException.createIOException(exceptions);
}
setupAsyncLazyPersistThread(fsVolume);
builder.build();
synchronized (this) {
volumeMap.addAll(tempVolumeMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
@ -352,80 +364,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
volumes.addVolume(fsVolume);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
/**
* Add an array of StorageLocation to FsDataset.
*
* @pre dataStorage must have these volumes.
* @param volumes an array of storage locations for adding volumes.
* @param bpids block pool IDs.
* @return an array of successfully loaded volumes.
*/
@Override
public synchronized List<StorageLocation> addVolumes(
final List<StorageLocation> volumes, final Collection<String> bpids) {
final Collection<StorageLocation> dataLocations =
DataNode.getStorageLocations(this.conf);
final Map<String, Storage.StorageDirectory> allStorageDirs =
new HashMap<String, Storage.StorageDirectory>();
List<StorageLocation> succeedVolumes = Lists.newArrayList();
try {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd);
}
} catch (IOException ioe) {
LOG.warn("Caught exception when parsing storage URL.", ioe);
return succeedVolumes;
}
final boolean[] successFlags = new boolean[volumes.size()];
Arrays.fill(successFlags, false);
List<Thread> volumeAddingThreads = Lists.newArrayList();
for (int i = 0; i < volumes.size(); i++) {
final int idx = i;
Thread t = new Thread() {
public void run() {
StorageLocation vol = volumes.get(idx);
try {
String key = vol.getFile().getCanonicalPath();
if (!allStorageDirs.containsKey(key)) {
LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
} else {
addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key),
bpids);
successFlags[idx] = true;
}
} catch (IOException e) {
LOG.warn("Caught exception when adding volume " + vol, e);
}
}
};
volumeAddingThreads.add(t);
t.start();
}
for (Thread t : volumeAddingThreads) {
try {
t.join();
} catch (InterruptedException e) {
LOG.warn("Caught InterruptedException when adding volume.", e);
}
}
setupAsyncLazyPersistThreads();
for (int i = 0; i < volumes.size(); i++) {
if (successFlags[i]) {
succeedVolumes.add(volumes.get(i));
}
}
return succeedVolumes;
}
/**
* Removes a collection of volumes from FsDataset.
* @param volumes the root directories of the volumes.
@ -2476,13 +2418,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// added or removed.
// This should only be called when the FsDataSetImpl#volumes list is finalized.
private void setupAsyncLazyPersistThreads() {
boolean ramDiskConfigured = ramDiskConfigured();
for (FsVolumeImpl v: getVolumes()){
// Skip transient volumes
if (v.isTransientStorage()) {
continue;
setupAsyncLazyPersistThread(v);
}
}
private void setupAsyncLazyPersistThread(final FsVolumeImpl v) {
// Skip transient volumes
if (v.isTransientStorage()) {
return;
}
boolean ramDiskConfigured = ramDiskConfigured();
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
!asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
@ -2495,7 +2441,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
asyncLazyPersistService.removeVolume(v.getCurrentDir());
}
}
}
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@ -53,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
@ -1194,8 +1196,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
final Collection<String> bpids) {
public void addVolume(
final StorageLocation location,
final List<NamespaceInfo> nsInfos) throws IOException {
throw new UnsupportedOperationException();
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.BlockLocation;
@ -33,6 +35,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -58,9 +62,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -197,10 +206,11 @@ public class TestDataNodeHotSwapVolumes {
}
assertFalse(oldLocations.isEmpty());
String newPaths = "/foo/path1,/foo/path2";
conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths);
String newPaths = oldLocations.get(0).getFile().getAbsolutePath() +
",/foo/path1,/foo/path2";
DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes();
DataNode.ChangedVolumes changedVolumes =
dn.parseChangedVolumes(newPaths);
List<StorageLocation> newVolumes = changedVolumes.newLocations;
assertEquals(2, newVolumes.size());
assertEquals(new File("/foo/path1").getAbsolutePath(),
@ -209,21 +219,21 @@ public class TestDataNodeHotSwapVolumes {
newVolumes.get(1).getFile().getAbsolutePath());
List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations;
assertEquals(oldLocations.size(), removedVolumes.size());
for (int i = 0; i < removedVolumes.size(); i++) {
assertEquals(oldLocations.get(i).getFile(),
removedVolumes.get(i).getFile());
}
assertEquals(1, removedVolumes.size());
assertEquals(oldLocations.get(1).getFile(),
removedVolumes.get(0).getFile());
assertEquals(1, changedVolumes.unchangedLocations.size());
assertEquals(oldLocations.get(0).getFile(),
changedVolumes.unchangedLocations.get(0).getFile());
}
@Test
public void testParseChangedVolumesFailures() throws IOException {
startDFSCluster(1, 1);
DataNode dn = cluster.getDataNodes().get(0);
Configuration conf = dn.getConf();
try {
conf.set(DFS_DATANODE_DATA_DIR_KEY, "");
dn.parseChangedVolumes();
dn.parseChangedVolumes("");
fail("Should throw IOException: empty inputs.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("No directory is specified.", e);
@ -231,7 +241,8 @@ public class TestDataNodeHotSwapVolumes {
}
/** Add volumes to the first DataNode. */
private void addVolumes(int numNewVolumes) throws ReconfigurationException {
private void addVolumes(int numNewVolumes)
throws ReconfigurationException, IOException {
File dataDir = new File(cluster.getDataDirectory());
DataNode dn = cluster.getDataNodes().get(0); // First DataNode.
Configuration conf = dn.getConf();
@ -253,12 +264,26 @@ public class TestDataNodeHotSwapVolumes {
newVolumeDirs.add(volumeDir);
volumeDir.mkdirs();
newDataDirBuf.append(",");
newDataDirBuf.append(volumeDir.toURI());
newDataDirBuf.append(
StorageLocation.parse(volumeDir.toString()).toString());
}
String newDataDir = newDataDirBuf.toString();
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir);
assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY));
// Verify the configuration value is appropriately set.
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
String[] expectDataDirs = newDataDir.split(",");
assertEquals(expectDataDirs.length, effectiveDataDirs.length);
for (int i = 0; i < expectDataDirs.length; i++) {
StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]);
StorageLocation effectiveLocation =
StorageLocation.parse(effectiveDataDirs[i]);
assertEquals(expectLocation.getStorageType(),
effectiveLocation.getStorageType());
assertEquals(expectLocation.getFile().getCanonicalFile(),
effectiveLocation.getFile().getCanonicalFile());
}
// Check that all newly created volumes are appropriately formatted.
for (File volumeDir : newVolumeDirs) {
@ -447,6 +472,59 @@ public class TestDataNodeHotSwapVolumes {
DFSTestUtil.waitReplication(fs, testFile, replFactor);
}
@Test
public void testAddVolumeFailures() throws IOException {
startDFSCluster(1, 1);
final String dataDir = cluster.getDataDirectory();
DataNode dn = cluster.getDataNodes().get(0);
List<String> newDirs = Lists.newArrayList();
final int NUM_NEW_DIRS = 4;
for (int i = 0; i < NUM_NEW_DIRS; i++) {
File newVolume = new File(dataDir, "new_vol" + i);
newDirs.add(newVolume.toString());
if (i % 2 == 0) {
// Make addVolume() fail.
newVolume.createNewFile();
}
}
String newValue = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY) + "," +
Joiner.on(",").join(newDirs);
try {
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newValue);
fail("Expect to throw IOException.");
} catch (ReconfigurationException e) {
String errorMessage = e.getCause().getMessage();
String messages[] = errorMessage.split("\\r?\\n");
assertEquals(2, messages.length);
assertThat(messages[0], containsString("new_vol0"));
assertThat(messages[1], containsString("new_vol2"));
}
// Make sure that vol0 and vol2's metadata are not left in memory.
FsDatasetSpi<?> dataset = dn.getFSDataset();
for (FsVolumeSpi volume : dataset.getVolumes()) {
assertThat(volume.getBasePath(), is(not(anyOf(
is(newDirs.get(0)), is(newDirs.get(2))))));
}
DataStorage storage = dn.getStorage();
for (int i = 0; i < storage.getNumStorageDirs(); i++) {
Storage.StorageDirectory sd = storage.getStorageDir(i);
assertThat(sd.getRoot().toString(),
is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
}
// The newly effective conf does not have vol0 and vol2.
String[] effectiveVolumes =
dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(",");
assertEquals(4, effectiveVolumes.length);
for (String ev : effectiveVolumes) {
assertThat(new File(ev).getCanonicalPath(),
is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
}
}
/**
* Asserts that the storage lock file in each given directory has been
* released. This method works by trying to acquire the lock file itself. If

View File

@ -146,14 +146,11 @@ public class TestDataStorage {
assertEquals(numLocations, storage.getNumStorageDirs());
locations = createStorageLocations(numLocations);
try {
List<StorageLocation> addedLocation =
storage.addStorageLocations(mockDN, namespaceInfos.get(0),
locations, START_OPT);
fail("Expected to throw IOException: adding active directories.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
"All specified directories are not accessible or do not exist.", e);
}
assertTrue(addedLocation.isEmpty());
// The number of active storage dirs has not changed, since it tries to
// add the storage dirs that are under service.
assertEquals(numLocations, storage.getNumStorageDirs());
@ -169,13 +166,12 @@ public class TestDataStorage {
final int numLocations = 3;
List<StorageLocation> locations =
createStorageLocations(numLocations, true);
try {
storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
fail("An IOException should throw: all StorageLocations are NON_EXISTENT");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
"All specified directories are not accessible or do not exist.", e);
"All specified directories are failed to load.", e);
}
assertEquals(0, storage.getNumStorageDirs());
}
@ -191,9 +187,9 @@ public class TestDataStorage {
throws IOException {
final int numLocations = 3;
List<StorageLocation> locations = createStorageLocations(numLocations);
String bpid = nsInfo.getBlockPoolID();
// Prepare volumes
storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
assertEquals(numLocations, storage.getNumStorageDirs());
// Reset DataStorage
storage.unlockAll();
@ -201,11 +197,11 @@ public class TestDataStorage {
// Trigger an exception from doTransition().
nsInfo.clusterID = "cluster1";
try {
storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
fail("Expect to throw an exception from doTransition()");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e);
GenericTestUtils.assertExceptionContains("All specified directories", e);
}
assertEquals(numLocations, storage.getNumStorageDirs());
assertEquals(0, storage.getNumStorageDirs());
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
@ -40,7 +42,6 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -48,7 +49,9 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -64,6 +67,7 @@ public class TestFsDatasetImpl {
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
private Configuration conf;
private DataNode datanode;
private DataStorage storage;
private DataBlockScanner scanner;
private FsDatasetImpl dataset;
@ -94,7 +98,7 @@ public class TestFsDatasetImpl {
@Before
public void setUp() throws IOException {
final DataNode datanode = Mockito.mock(DataNode.class);
datanode = Mockito.mock(DataNode.class);
storage = Mockito.mock(DataStorage.class);
scanner = Mockito.mock(DataBlockScanner.class);
this.conf = new Configuration();
@ -119,17 +123,24 @@ public class TestFsDatasetImpl {
final int numNewVolumes = 3;
final int numExistingVolumes = dataset.getVolumes().size();
final int totalVolumes = numNewVolumes + numExistingVolumes;
List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
Set<String> expectedVolumes = new HashSet<String>();
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, "cluster-id", bpid, 1));
}
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
newLocations.add(StorageLocation.parse(path));
when(storage.getStorageDir(numExistingVolumes + i))
.thenReturn(createStorageDirectory(new File(path)));
}
when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
StorageLocation loc = StorageLocation.parse(path);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
}
dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS));
assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, dataset.storageMap.size());