HDFS-8578. On upgrade, Datanode should process all storage/data dirs in parallel. Contributed by vinayakumarb and szetszwo

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-02-22 15:01:15 -08:00
parent 56b7f2c2c1
commit 9b10373b2b
4 changed files with 161 additions and 48 deletions

View File

@ -1910,6 +1910,9 @@ Release 2.7.3 - UNRELEASED
HDFS-4946. Allow preferLocalNode in BlockPlacementPolicyDefault to be
configurable (James Kinley and Nathan Roberts via kihwal)
HDFS-8578. On upgrade, Datanode should process all storage/data dirs in
parallel. (vinayakumarb and szetszwo via szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -807,6 +807,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.slow.io.warning.threshold.ms";
public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
// Number of parallel threads to load multiple datanode volumes
public static final String DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY =
"dfs.datanode.parallel.volumes.load.threads.num";
public static final String DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY =
"dfs.namenode.inotify.max.events.per.rpc";
public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =

View File

@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -145,7 +146,8 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException
*/
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
File dataDir, StartupOption startOpt, Configuration conf)
File dataDir, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
try {
@ -172,19 +174,17 @@ public class BlockPoolSliceStorage extends Storage {
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
if (doTransition(sd, nsInfo, startOpt, conf)) {
return sd;
}
if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {
if (getCTime() != nsInfo.getCTime()) {
throw new IOException("Datanode CTime (=" + getCTime()
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
// 3. Check CTime and update successfully loaded storage.
if (getCTime() != nsInfo.getCTime()) {
throw new IOException("Datanode CTime (=" + getCTime()
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
}
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
}
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
return sd;
} catch (IOException ioe) {
sd.unlock();
@ -208,7 +208,8 @@ public class BlockPoolSliceStorage extends Storage {
*/
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt,
Configuration conf) throws IOException {
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList();
try {
for (File dataDir : dataDirs) {
@ -218,7 +219,7 @@ public class BlockPoolSliceStorage extends Storage {
"attempt to load an used block storage: " + dataDir);
}
final StorageDirectory sd = loadStorageDirectory(
nsInfo, dataDir, startOpt, conf);
nsInfo, dataDir, startOpt, callables, conf);
succeedDirs.add(sd);
}
} catch (IOException e) {
@ -242,11 +243,12 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt, Configuration conf)
Collection<File> dataDirs, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
final List<StorageDirectory> loaded = loadBpStorageDirectories(
nsInfo, dataDirs, startOpt, conf);
nsInfo, dataDirs, startOpt, callables, conf);
for (StorageDirectory sd : loaded) {
addStorageDir(sd);
}
@ -353,7 +355,8 @@ public class BlockPoolSliceStorage extends Storage {
* @return true if the new properties has been written.
*/
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, Configuration conf) throws IOException {
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
Preconditions.checkState(!getTrashRootDir(sd).exists(),
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@ -395,7 +398,7 @@ public class BlockPoolSliceStorage extends Storage {
}
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo, conf); // upgrade
doUpgrade(sd, nsInfo, callables, conf); // upgrade
return true;
}
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@ -425,7 +428,9 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
private void doUpgrade(final StorageDirectory bpSd,
final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
final NamespaceInfo nsInfo,
final List<Callable<StorageDirectory>> callables,
final Configuration conf) throws IOException {
// Upgrading is applicable only to release with federation or after
if (!DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@ -463,10 +468,21 @@ public class BlockPoolSliceStorage extends Storage {
rename(bpCurDir, bpTmpDir);
final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
if (callables == null) {
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
} else {
callables.add(new Callable<StorageDirectory>() {
@Override
public StorageDirectory call() throws Exception {
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV,
conf);
return bpSd;
}
});
}
}
private void doUgrade(String name, final StorageDirectory bpSd,
private void doUpgrade(String name, final StorageDirectory bpSd,
NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
final File bpCurDir, final int oldLV, Configuration conf)
throws IOException {

View File

@ -37,10 +37,12 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -51,6 +53,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@ -260,8 +263,8 @@ public class DataStorage extends Storage {
}
private StorageDirectory loadStorageDirectory(DataNode datanode,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
throws IOException {
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
List<Callable<StorageDirectory>> callables) throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
try {
StorageState curState = sd.analyzeStorage(startOpt, this);
@ -287,13 +290,12 @@ public class DataStorage extends Storage {
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) {
return sd;
}
if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
}
return sd;
} catch (IOException ioe) {
@ -325,7 +327,7 @@ public class DataStorage extends Storage {
}
StorageDirectory sd = loadStorageDirectory(
datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP);
datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
VolumeBuilder builder =
new VolumeBuilder(this, sd);
for (NamespaceInfo nsInfo : nsInfos) {
@ -336,12 +338,35 @@ public class DataStorage extends Storage {
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf());
nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
}
return builder;
}
static int getParallelVolumeLoadThreadsNum(int dataDirs, Configuration conf) {
final String key
= DFSConfigKeys.DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY;
final int n = conf.getInt(key, dataDirs);
if (n < 1) {
throw new HadoopIllegalArgumentException(key + " = " + n + " < 1");
}
final int min = Math.min(n, dataDirs);
LOG.info("Using " + min + " threads to upgrade data directories ("
+ key + "=" + n + ", dataDirs=" + dataDirs + ")");
return min;
}
static class UpgradeTask {
private final StorageLocation dataDir;
private final Future<StorageDirectory> future;
UpgradeTask(StorageLocation dataDir, Future<StorageDirectory> future) {
this.dataDir = dataDir;
this.future = future;
}
}
/**
* Add a list of volumes to be managed by DataStorage. If the volume is empty,
* format it, otherwise recover it from previous transitions if required.
@ -356,32 +381,62 @@ public class DataStorage extends Storage {
synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException {
final List<StorageLocation> successLocations = loadDataStorage(
datanode, nsInfo, dataDirs, startOpt);
return loadBlockPoolSliceStorage(
datanode, nsInfo, successLocations, startOpt);
final int numThreads = getParallelVolumeLoadThreadsNum(
dataDirs.size(), datanode.getConf());
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
try {
final List<StorageLocation> successLocations = loadDataStorage(
datanode, nsInfo, dataDirs, startOpt, executor);
return loadBlockPoolSliceStorage(
datanode, nsInfo, successLocations, startOpt, executor);
} finally {
executor.shutdown();
}
}
private List<StorageLocation> loadDataStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException {
StartupOption startOpt, ExecutorService executor) throws IOException {
final List<StorageLocation> success = Lists.newArrayList();
final List<UpgradeTask> tasks = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) {
File root = dataDir.getFile();
if (!containsStorageDir(root)) {
try {
// It first ensures the datanode level format is completed.
final List<Callable<StorageDirectory>> callables
= Lists.newArrayList();
final StorageDirectory sd = loadStorageDirectory(
datanode, nsInfo, root, startOpt);
addStorageDir(sd);
datanode, nsInfo, root, startOpt, callables);
if (callables.isEmpty()) {
addStorageDir(sd);
success.add(dataDir);
} else {
for(Callable<StorageDirectory> c : callables) {
tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
}
}
} catch (IOException e) {
LOG.warn("Failed to add storage directory " + dataDir, e);
continue;
}
} else {
LOG.info("Storage directory " + dataDir + " has already been used.");
success.add(dataDir);
}
}
if (!tasks.isEmpty()) {
LOG.info("loadDataStorage: " + tasks.size() + " upgrade tasks");
for(UpgradeTask t : tasks) {
try {
addStorageDir(t.future.get());
success.add(t.dataDir);
} catch (ExecutionException e) {
LOG.warn("Failed to upgrade storage directory " + t.dataDir, e);
} catch (InterruptedException e) {
throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
}
}
success.add(dataDir);
}
return success;
@ -389,10 +444,11 @@ public class DataStorage extends Storage {
private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException {
StartupOption startOpt, ExecutorService executor) throws IOException {
final String bpid = nsInfo.getBlockPoolID();
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> success = Lists.newArrayList();
final List<UpgradeTask> tasks = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) {
final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
List<File> bpDataDirs = new ArrayList<File>();
@ -400,10 +456,17 @@ public class DataStorage extends Storage {
try {
makeBlockPoolDataDir(bpDataDirs, null);
final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
nsInfo, bpDataDirs, startOpt, datanode.getConf());
for(StorageDirectory sd : dirs) {
success.add(sd);
nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
if (callables.isEmpty()) {
for(StorageDirectory sd : dirs) {
success.add(sd);
}
} else {
for(Callable<StorageDirectory> c : callables) {
tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
}
}
} catch (IOException e) {
LOG.warn("Failed to add storage directory " + dataDir
@ -411,6 +474,20 @@ public class DataStorage extends Storage {
}
}
if (!tasks.isEmpty()) {
LOG.info("loadBlockPoolSliceStorage: " + tasks.size() + " upgrade tasks");
for(UpgradeTask t : tasks) {
try {
success.add(t.future.get());
} catch (ExecutionException e) {
LOG.warn("Failed to upgrade storage directory " + t.dataDir
+ " for block pool " + bpid, e);
} catch (InterruptedException e) {
throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
}
}
}
return success;
}
@ -655,7 +732,8 @@ public class DataStorage extends Storage {
* @return true if the new properties has been written.
*/
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, Configuration conf) throws IOException {
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable
}
@ -697,7 +775,7 @@ public class DataStorage extends Storage {
// simply update the properties.
upgradeProperties(sd);
} else {
doUpgradePreFederation(sd, nsInfo, conf);
doUpgradePreFederation(sd, nsInfo, callables, conf);
}
return true; // doUgrade already has written properties
}
@ -734,7 +812,9 @@ public class DataStorage extends Storage {
* @param sd storage directory
*/
void doUpgradePreFederation(final StorageDirectory sd,
final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
final NamespaceInfo nsInfo,
final List<Callable<StorageDirectory>> callables,
final Configuration conf) throws IOException {
final int oldLV = getLayoutVersion();
LOG.info("Upgrading storage directory " + sd.getRoot()
+ ".\n old LV = " + oldLV
@ -767,10 +847,20 @@ public class DataStorage extends Storage {
bpStorage.format(curDir, nsInfo);
final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
if (callables == null) {
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
} else {
callables.add(new Callable<StorageDirectory>() {
@Override
public StorageDirectory call() throws Exception {
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
return sd;
}
});
}
}
private void doUgrade(final StorageDirectory sd,
private void doUpgrade(final StorageDirectory sd,
final NamespaceInfo nsInfo, final File prevDir,
final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
Configuration conf) throws IOException {