HDFS-8578. On upgrade, Datanode should process all storage/data dirs in parallel. Contributed by vinayakumarb and szetszwo
This commit is contained in:
parent
a2fdfff02d
commit
66289a3bf4
|
@ -2840,6 +2840,9 @@ Release 2.7.3 - UNRELEASED
|
|||
HDFS-4946. Allow preferLocalNode in BlockPlacementPolicyDefault to be
|
||||
configurable (James Kinley and Nathan Roberts via kihwal)
|
||||
|
||||
HDFS-8578. On upgrade, Datanode should process all storage/data dirs in
|
||||
parallel. (vinayakumarb and szetszwo via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -836,6 +836,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
"dfs.datanode.slow.io.warning.threshold.ms";
|
||||
public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
|
||||
|
||||
// Number of parallel threads to load multiple datanode volumes
|
||||
public static final String DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY =
|
||||
"dfs.datanode.parallel.volumes.load.threads.num";
|
||||
public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY =
|
||||
"dfs.datanode.block.id.layout.upgrade.threads";
|
||||
public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -145,7 +146,8 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
* @throws IOException
|
||||
*/
|
||||
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
|
||||
File dataDir, StartupOption startOpt, Configuration conf)
|
||||
File dataDir, StartupOption startOpt,
|
||||
List<Callable<StorageDirectory>> callables, Configuration conf)
|
||||
throws IOException {
|
||||
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
|
||||
try {
|
||||
|
@ -172,19 +174,17 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
// Each storage directory is treated individually.
|
||||
// During startup some of them can upgrade or roll back
|
||||
// while others could be up-to-date for the regular startup.
|
||||
if (doTransition(sd, nsInfo, startOpt, conf)) {
|
||||
return sd;
|
||||
}
|
||||
if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {
|
||||
|
||||
if (getCTime() != nsInfo.getCTime()) {
|
||||
throw new IOException("Datanode CTime (=" + getCTime()
|
||||
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
|
||||
// 3. Check CTime and update successfully loaded storage.
|
||||
if (getCTime() != nsInfo.getCTime()) {
|
||||
throw new IOException("Datanode CTime (=" + getCTime()
|
||||
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
|
||||
}
|
||||
setServiceLayoutVersion(getServiceLayoutVersion());
|
||||
writeProperties(sd);
|
||||
}
|
||||
|
||||
// 3. Update successfully loaded storage.
|
||||
setServiceLayoutVersion(getServiceLayoutVersion());
|
||||
writeProperties(sd);
|
||||
|
||||
return sd;
|
||||
} catch (IOException ioe) {
|
||||
sd.unlock();
|
||||
|
@ -208,7 +208,8 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
*/
|
||||
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
|
||||
Collection<File> dataDirs, StartupOption startOpt,
|
||||
Configuration conf) throws IOException {
|
||||
List<Callable<StorageDirectory>> callables, Configuration conf)
|
||||
throws IOException {
|
||||
List<StorageDirectory> succeedDirs = Lists.newArrayList();
|
||||
try {
|
||||
for (File dataDir : dataDirs) {
|
||||
|
@ -218,7 +219,7 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
"attempt to load an used block storage: " + dataDir);
|
||||
}
|
||||
final StorageDirectory sd = loadStorageDirectory(
|
||||
nsInfo, dataDir, startOpt, conf);
|
||||
nsInfo, dataDir, startOpt, callables, conf);
|
||||
succeedDirs.add(sd);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -242,11 +243,12 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
* @throws IOException on error
|
||||
*/
|
||||
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
|
||||
Collection<File> dataDirs, StartupOption startOpt, Configuration conf)
|
||||
Collection<File> dataDirs, StartupOption startOpt,
|
||||
List<Callable<StorageDirectory>> callables, Configuration conf)
|
||||
throws IOException {
|
||||
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
|
||||
final List<StorageDirectory> loaded = loadBpStorageDirectories(
|
||||
nsInfo, dataDirs, startOpt, conf);
|
||||
nsInfo, dataDirs, startOpt, callables, conf);
|
||||
for (StorageDirectory sd : loaded) {
|
||||
addStorageDir(sd);
|
||||
}
|
||||
|
@ -353,7 +355,8 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
* @return true if the new properties has been written.
|
||||
*/
|
||||
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
|
||||
StartupOption startOpt, Configuration conf) throws IOException {
|
||||
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
|
||||
Configuration conf) throws IOException {
|
||||
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
|
||||
Preconditions.checkState(!getTrashRootDir(sd).exists(),
|
||||
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
|
||||
|
@ -395,7 +398,7 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
}
|
||||
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
|
||||
|| this.cTime < nsInfo.getCTime()) {
|
||||
doUpgrade(sd, nsInfo, conf); // upgrade
|
||||
doUpgrade(sd, nsInfo, callables, conf); // upgrade
|
||||
return true;
|
||||
}
|
||||
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
|
||||
|
@ -425,7 +428,9 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
* @throws IOException on error
|
||||
*/
|
||||
private void doUpgrade(final StorageDirectory bpSd,
|
||||
final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
|
||||
final NamespaceInfo nsInfo,
|
||||
final List<Callable<StorageDirectory>> callables,
|
||||
final Configuration conf) throws IOException {
|
||||
// Upgrading is applicable only to release with federation or after
|
||||
if (!DataNodeLayoutVersion.supports(
|
||||
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
||||
|
@ -463,10 +468,21 @@ public class BlockPoolSliceStorage extends Storage {
|
|||
rename(bpCurDir, bpTmpDir);
|
||||
|
||||
final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
|
||||
doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
|
||||
if (callables == null) {
|
||||
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
|
||||
} else {
|
||||
callables.add(new Callable<StorageDirectory>() {
|
||||
@Override
|
||||
public StorageDirectory call() throws Exception {
|
||||
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV,
|
||||
conf);
|
||||
return bpSd;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void doUgrade(String name, final StorageDirectory bpSd,
|
||||
private void doUpgrade(String name, final StorageDirectory bpSd,
|
||||
NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
|
||||
final File bpCurDir, final int oldLV, Configuration conf)
|
||||
throws IOException {
|
||||
|
|
|
@ -37,10 +37,12 @@ import java.util.Set;
|
|||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -51,6 +53,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
|
@ -260,8 +263,8 @@ public class DataStorage extends Storage {
|
|||
}
|
||||
|
||||
private StorageDirectory loadStorageDirectory(DataNode datanode,
|
||||
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
|
||||
throws IOException {
|
||||
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
|
||||
List<Callable<StorageDirectory>> callables) throws IOException {
|
||||
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
|
||||
try {
|
||||
StorageState curState = sd.analyzeStorage(startOpt, this);
|
||||
|
@ -287,13 +290,12 @@ public class DataStorage extends Storage {
|
|||
// Each storage directory is treated individually.
|
||||
// During startup some of them can upgrade or roll back
|
||||
// while others could be up-to-date for the regular startup.
|
||||
if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) {
|
||||
return sd;
|
||||
}
|
||||
if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {
|
||||
|
||||
// 3. Update successfully loaded storage.
|
||||
setServiceLayoutVersion(getServiceLayoutVersion());
|
||||
writeProperties(sd);
|
||||
// 3. Update successfully loaded storage.
|
||||
setServiceLayoutVersion(getServiceLayoutVersion());
|
||||
writeProperties(sd);
|
||||
}
|
||||
|
||||
return sd;
|
||||
} catch (IOException ioe) {
|
||||
|
@ -325,7 +327,7 @@ public class DataStorage extends Storage {
|
|||
}
|
||||
|
||||
StorageDirectory sd = loadStorageDirectory(
|
||||
datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP);
|
||||
datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
|
||||
VolumeBuilder builder =
|
||||
new VolumeBuilder(this, sd);
|
||||
for (NamespaceInfo nsInfo : nsInfos) {
|
||||
|
@ -336,12 +338,35 @@ public class DataStorage extends Storage {
|
|||
|
||||
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
|
||||
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
|
||||
nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf());
|
||||
nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
|
||||
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
static int getParallelVolumeLoadThreadsNum(int dataDirs, Configuration conf) {
|
||||
final String key
|
||||
= DFSConfigKeys.DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY;
|
||||
final int n = conf.getInt(key, dataDirs);
|
||||
if (n < 1) {
|
||||
throw new HadoopIllegalArgumentException(key + " = " + n + " < 1");
|
||||
}
|
||||
final int min = Math.min(n, dataDirs);
|
||||
LOG.info("Using " + min + " threads to upgrade data directories ("
|
||||
+ key + "=" + n + ", dataDirs=" + dataDirs + ")");
|
||||
return min;
|
||||
}
|
||||
|
||||
static class UpgradeTask {
|
||||
private final StorageLocation dataDir;
|
||||
private final Future<StorageDirectory> future;
|
||||
|
||||
UpgradeTask(StorageLocation dataDir, Future<StorageDirectory> future) {
|
||||
this.dataDir = dataDir;
|
||||
this.future = future;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a list of volumes to be managed by DataStorage. If the volume is empty,
|
||||
* format it, otherwise recover it from previous transitions if required.
|
||||
|
@ -356,32 +381,62 @@ public class DataStorage extends Storage {
|
|||
synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
|
||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||
StartupOption startOpt) throws IOException {
|
||||
final List<StorageLocation> successLocations = loadDataStorage(
|
||||
datanode, nsInfo, dataDirs, startOpt);
|
||||
return loadBlockPoolSliceStorage(
|
||||
datanode, nsInfo, successLocations, startOpt);
|
||||
final int numThreads = getParallelVolumeLoadThreadsNum(
|
||||
dataDirs.size(), datanode.getConf());
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
|
||||
try {
|
||||
final List<StorageLocation> successLocations = loadDataStorage(
|
||||
datanode, nsInfo, dataDirs, startOpt, executor);
|
||||
return loadBlockPoolSliceStorage(
|
||||
datanode, nsInfo, successLocations, startOpt, executor);
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private List<StorageLocation> loadDataStorage(DataNode datanode,
|
||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||
StartupOption startOpt) throws IOException {
|
||||
StartupOption startOpt, ExecutorService executor) throws IOException {
|
||||
final List<StorageLocation> success = Lists.newArrayList();
|
||||
final List<UpgradeTask> tasks = Lists.newArrayList();
|
||||
for (StorageLocation dataDir : dataDirs) {
|
||||
File root = dataDir.getFile();
|
||||
if (!containsStorageDir(root)) {
|
||||
try {
|
||||
// It first ensures the datanode level format is completed.
|
||||
final List<Callable<StorageDirectory>> callables
|
||||
= Lists.newArrayList();
|
||||
final StorageDirectory sd = loadStorageDirectory(
|
||||
datanode, nsInfo, root, startOpt);
|
||||
addStorageDir(sd);
|
||||
datanode, nsInfo, root, startOpt, callables);
|
||||
if (callables.isEmpty()) {
|
||||
addStorageDir(sd);
|
||||
success.add(dataDir);
|
||||
} else {
|
||||
for(Callable<StorageDirectory> c : callables) {
|
||||
tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to add storage directory " + dataDir, e);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
LOG.info("Storage directory " + dataDir + " has already been used.");
|
||||
success.add(dataDir);
|
||||
}
|
||||
}
|
||||
|
||||
if (!tasks.isEmpty()) {
|
||||
LOG.info("loadDataStorage: " + tasks.size() + " upgrade tasks");
|
||||
for(UpgradeTask t : tasks) {
|
||||
try {
|
||||
addStorageDir(t.future.get());
|
||||
success.add(t.dataDir);
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("Failed to upgrade storage directory " + t.dataDir, e);
|
||||
} catch (InterruptedException e) {
|
||||
throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
|
||||
}
|
||||
}
|
||||
success.add(dataDir);
|
||||
}
|
||||
|
||||
return success;
|
||||
|
@ -389,10 +444,11 @@ public class DataStorage extends Storage {
|
|||
|
||||
private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
|
||||
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
|
||||
StartupOption startOpt) throws IOException {
|
||||
StartupOption startOpt, ExecutorService executor) throws IOException {
|
||||
final String bpid = nsInfo.getBlockPoolID();
|
||||
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
|
||||
final List<StorageDirectory> success = Lists.newArrayList();
|
||||
final List<UpgradeTask> tasks = Lists.newArrayList();
|
||||
for (StorageLocation dataDir : dataDirs) {
|
||||
final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
|
||||
List<File> bpDataDirs = new ArrayList<File>();
|
||||
|
@ -400,10 +456,17 @@ public class DataStorage extends Storage {
|
|||
try {
|
||||
makeBlockPoolDataDir(bpDataDirs, null);
|
||||
|
||||
final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
|
||||
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
|
||||
nsInfo, bpDataDirs, startOpt, datanode.getConf());
|
||||
for(StorageDirectory sd : dirs) {
|
||||
success.add(sd);
|
||||
nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
|
||||
if (callables.isEmpty()) {
|
||||
for(StorageDirectory sd : dirs) {
|
||||
success.add(sd);
|
||||
}
|
||||
} else {
|
||||
for(Callable<StorageDirectory> c : callables) {
|
||||
tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to add storage directory " + dataDir
|
||||
|
@ -411,6 +474,20 @@ public class DataStorage extends Storage {
|
|||
}
|
||||
}
|
||||
|
||||
if (!tasks.isEmpty()) {
|
||||
LOG.info("loadBlockPoolSliceStorage: " + tasks.size() + " upgrade tasks");
|
||||
for(UpgradeTask t : tasks) {
|
||||
try {
|
||||
success.add(t.future.get());
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("Failed to upgrade storage directory " + t.dataDir
|
||||
+ " for block pool " + bpid, e);
|
||||
} catch (InterruptedException e) {
|
||||
throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
|
@ -655,7 +732,8 @@ public class DataStorage extends Storage {
|
|||
* @return true if the new properties has been written.
|
||||
*/
|
||||
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
|
||||
StartupOption startOpt, Configuration conf) throws IOException {
|
||||
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
|
||||
Configuration conf) throws IOException {
|
||||
if (startOpt == StartupOption.ROLLBACK) {
|
||||
doRollback(sd, nsInfo); // rollback if applicable
|
||||
}
|
||||
|
@ -697,7 +775,7 @@ public class DataStorage extends Storage {
|
|||
// simply update the properties.
|
||||
upgradeProperties(sd);
|
||||
} else {
|
||||
doUpgradePreFederation(sd, nsInfo, conf);
|
||||
doUpgradePreFederation(sd, nsInfo, callables, conf);
|
||||
}
|
||||
return true; // doUgrade already has written properties
|
||||
}
|
||||
|
@ -734,7 +812,9 @@ public class DataStorage extends Storage {
|
|||
* @param sd storage directory
|
||||
*/
|
||||
void doUpgradePreFederation(final StorageDirectory sd,
|
||||
final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
|
||||
final NamespaceInfo nsInfo,
|
||||
final List<Callable<StorageDirectory>> callables,
|
||||
final Configuration conf) throws IOException {
|
||||
final int oldLV = getLayoutVersion();
|
||||
LOG.info("Upgrading storage directory " + sd.getRoot()
|
||||
+ ".\n old LV = " + oldLV
|
||||
|
@ -767,10 +847,20 @@ public class DataStorage extends Storage {
|
|||
bpStorage.format(curDir, nsInfo);
|
||||
|
||||
final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
|
||||
doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
|
||||
if (callables == null) {
|
||||
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
|
||||
} else {
|
||||
callables.add(new Callable<StorageDirectory>() {
|
||||
@Override
|
||||
public StorageDirectory call() throws Exception {
|
||||
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
|
||||
return sd;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void doUgrade(final StorageDirectory sd,
|
||||
private void doUpgrade(final StorageDirectory sd,
|
||||
final NamespaceInfo nsInfo, final File prevDir,
|
||||
final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
|
||||
Configuration conf) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue