From 66289a3bf403f307844ea0b6ceed35b603d12c0b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 22 Feb 2016 15:01:15 -0800 Subject: [PATCH] HDFS-8578. On upgrade, Datanode should process all storage/data dirs in parallel. Contributed by vinayakumarb and szetszwo --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../datanode/BlockPoolSliceStorage.java | 56 ++++--- .../hdfs/server/datanode/DataStorage.java | 146 ++++++++++++++---- 4 files changed, 160 insertions(+), 48 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c39bcee90ac..256df39fadf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2840,6 +2840,9 @@ Release 2.7.3 - UNRELEASED HDFS-4946. Allow preferLocalNode in BlockPlacementPolicyDefault to be configurable (James Kinley and Nathan Roberts via kihwal) + HDFS-8578. On upgrade, Datanode should process all storage/data dirs in + parallel. (vinayakumarb and szetszwo via szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 39e4fe489f5..a055e2ba11b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -836,6 +836,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.slow.io.warning.threshold.ms"; public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300; + // Number of parallel threads to load multiple datanode volumes + public static final String DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY = + "dfs.datanode.parallel.volumes.load.threads.num"; public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY = "dfs.datanode.block.id.layout.upgrade.threads"; public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index acf10f1f52a..90a46698a86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -145,7 +146,8 @@ public class BlockPoolSliceStorage extends Storage { * @throws IOException */ private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo, - File dataDir, StartupOption startOpt, Configuration conf) + File dataDir, StartupOption startOpt, + List> callables, Configuration conf) throws IOException { StorageDirectory sd = new StorageDirectory(dataDir, null, true); try { @@ -172,19 +174,17 @@ public class BlockPoolSliceStorage extends Storage { // Each storage directory is treated individually. // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. - if (doTransition(sd, nsInfo, startOpt, conf)) { - return sd; - } + if (!doTransition(sd, nsInfo, startOpt, callables, conf)) { - if (getCTime() != nsInfo.getCTime()) { - throw new IOException("Datanode CTime (=" + getCTime() - + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")"); + // 3. Check CTime and update successfully loaded storage. + if (getCTime() != nsInfo.getCTime()) { + throw new IOException("Datanode CTime (=" + getCTime() + + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")"); + } + setServiceLayoutVersion(getServiceLayoutVersion()); + writeProperties(sd); } - // 3. Update successfully loaded storage. - setServiceLayoutVersion(getServiceLayoutVersion()); - writeProperties(sd); - return sd; } catch (IOException ioe) { sd.unlock(); @@ -208,7 +208,8 @@ public class BlockPoolSliceStorage extends Storage { */ List loadBpStorageDirectories(NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt, - Configuration conf) throws IOException { + List> callables, Configuration conf) + throws IOException { List succeedDirs = Lists.newArrayList(); try { for (File dataDir : dataDirs) { @@ -218,7 +219,7 @@ public class BlockPoolSliceStorage extends Storage { "attempt to load an used block storage: " + dataDir); } final StorageDirectory sd = loadStorageDirectory( - nsInfo, dataDir, startOpt, conf); + nsInfo, dataDir, startOpt, callables, conf); succeedDirs.add(sd); } } catch (IOException e) { @@ -242,11 +243,12 @@ public class BlockPoolSliceStorage extends Storage { * @throws IOException on error */ List recoverTransitionRead(NamespaceInfo nsInfo, - Collection dataDirs, StartupOption startOpt, Configuration conf) + Collection dataDirs, StartupOption startOpt, + List> callables, Configuration conf) throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); final List loaded = loadBpStorageDirectories( - nsInfo, dataDirs, startOpt, conf); + nsInfo, dataDirs, startOpt, callables, conf); for (StorageDirectory sd : loaded) { addStorageDir(sd); } @@ -353,7 +355,8 @@ public class BlockPoolSliceStorage extends Storage { * @return true if the new properties has been written. */ private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, - StartupOption startOpt, Configuration conf) throws IOException { + StartupOption startOpt, List> callables, + Configuration conf) throws IOException { if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { Preconditions.checkState(!getTrashRootDir(sd).exists(), sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + @@ -395,7 +398,7 @@ public class BlockPoolSliceStorage extends Storage { } if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION || this.cTime < nsInfo.getCTime()) { - doUpgrade(sd, nsInfo, conf); // upgrade + doUpgrade(sd, nsInfo, callables, conf); // upgrade return true; } // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime @@ -425,7 +428,9 @@ public class BlockPoolSliceStorage extends Storage { * @throws IOException on error */ private void doUpgrade(final StorageDirectory bpSd, - final NamespaceInfo nsInfo, final Configuration conf) throws IOException { + final NamespaceInfo nsInfo, + final List> callables, + final Configuration conf) throws IOException { // Upgrading is applicable only to release with federation or after if (!DataNodeLayoutVersion.supports( LayoutVersion.Feature.FEDERATION, layoutVersion)) { @@ -463,10 +468,21 @@ public class BlockPoolSliceStorage extends Storage { rename(bpCurDir, bpTmpDir); final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot(); - doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf); + if (callables == null) { + doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf); + } else { + callables.add(new Callable() { + @Override + public StorageDirectory call() throws Exception { + doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, + conf); + return bpSd; + } + }); + } } - private void doUgrade(String name, final StorageDirectory bpSd, + private void doUpgrade(String name, final StorageDirectory bpSd, NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir, final File bpCurDir, final int oldLV, Configuration conf) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 7903194e075..66970547a30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -37,10 +37,12 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -51,6 +53,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LayoutVersion; @@ -260,8 +263,8 @@ public class DataStorage extends Storage { } private StorageDirectory loadStorageDirectory(DataNode datanode, - NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) - throws IOException { + NamespaceInfo nsInfo, File dataDir, StartupOption startOpt, + List> callables) throws IOException { StorageDirectory sd = new StorageDirectory(dataDir, null, false); try { StorageState curState = sd.analyzeStorage(startOpt, this); @@ -287,13 +290,12 @@ public class DataStorage extends Storage { // Each storage directory is treated individually. // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. - if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) { - return sd; - } + if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) { - // 3. Update successfully loaded storage. - setServiceLayoutVersion(getServiceLayoutVersion()); - writeProperties(sd); + // 3. Update successfully loaded storage. + setServiceLayoutVersion(getServiceLayoutVersion()); + writeProperties(sd); + } return sd; } catch (IOException ioe) { @@ -325,7 +327,7 @@ public class DataStorage extends Storage { } StorageDirectory sd = loadStorageDirectory( - datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP); + datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null); VolumeBuilder builder = new VolumeBuilder(this, sd); for (NamespaceInfo nsInfo : nsInfos) { @@ -336,12 +338,35 @@ public class DataStorage extends Storage { final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); final List dirs = bpStorage.loadBpStorageDirectories( - nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf()); + nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf()); builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs); } return builder; } + static int getParallelVolumeLoadThreadsNum(int dataDirs, Configuration conf) { + final String key + = DFSConfigKeys.DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY; + final int n = conf.getInt(key, dataDirs); + if (n < 1) { + throw new HadoopIllegalArgumentException(key + " = " + n + " < 1"); + } + final int min = Math.min(n, dataDirs); + LOG.info("Using " + min + " threads to upgrade data directories (" + + key + "=" + n + ", dataDirs=" + dataDirs + ")"); + return min; + } + + static class UpgradeTask { + private final StorageLocation dataDir; + private final Future future; + + UpgradeTask(StorageLocation dataDir, Future future) { + this.dataDir = dataDir; + this.future = future; + } + } + /** * Add a list of volumes to be managed by DataStorage. If the volume is empty, * format it, otherwise recover it from previous transitions if required. @@ -356,32 +381,62 @@ public class DataStorage extends Storage { synchronized List addStorageLocations(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { - final List successLocations = loadDataStorage( - datanode, nsInfo, dataDirs, startOpt); - return loadBlockPoolSliceStorage( - datanode, nsInfo, successLocations, startOpt); + final int numThreads = getParallelVolumeLoadThreadsNum( + dataDirs.size(), datanode.getConf()); + final ExecutorService executor = Executors.newFixedThreadPool(numThreads); + try { + final List successLocations = loadDataStorage( + datanode, nsInfo, dataDirs, startOpt, executor); + return loadBlockPoolSliceStorage( + datanode, nsInfo, successLocations, startOpt, executor); + } finally { + executor.shutdown(); + } } private List loadDataStorage(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, - StartupOption startOpt) throws IOException { + StartupOption startOpt, ExecutorService executor) throws IOException { final List success = Lists.newArrayList(); + final List tasks = Lists.newArrayList(); for (StorageLocation dataDir : dataDirs) { File root = dataDir.getFile(); if (!containsStorageDir(root)) { try { // It first ensures the datanode level format is completed. + final List> callables + = Lists.newArrayList(); final StorageDirectory sd = loadStorageDirectory( - datanode, nsInfo, root, startOpt); - addStorageDir(sd); + datanode, nsInfo, root, startOpt, callables); + if (callables.isEmpty()) { + addStorageDir(sd); + success.add(dataDir); + } else { + for(Callable c : callables) { + tasks.add(new UpgradeTask(dataDir, executor.submit(c))); + } + } } catch (IOException e) { LOG.warn("Failed to add storage directory " + dataDir, e); - continue; } } else { LOG.info("Storage directory " + dataDir + " has already been used."); + success.add(dataDir); + } + } + + if (!tasks.isEmpty()) { + LOG.info("loadDataStorage: " + tasks.size() + " upgrade tasks"); + for(UpgradeTask t : tasks) { + try { + addStorageDir(t.future.get()); + success.add(t.dataDir); + } catch (ExecutionException e) { + LOG.warn("Failed to upgrade storage directory " + t.dataDir, e); + } catch (InterruptedException e) { + throw DFSUtilClient.toInterruptedIOException("Task interrupted", e); + } } - success.add(dataDir); } return success; @@ -389,10 +444,11 @@ public class DataStorage extends Storage { private List loadBlockPoolSliceStorage(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, - StartupOption startOpt) throws IOException { + StartupOption startOpt, ExecutorService executor) throws IOException { final String bpid = nsInfo.getBlockPoolID(); final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); final List success = Lists.newArrayList(); + final List tasks = Lists.newArrayList(); for (StorageLocation dataDir : dataDirs) { final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT); List bpDataDirs = new ArrayList(); @@ -400,10 +456,17 @@ public class DataStorage extends Storage { try { makeBlockPoolDataDir(bpDataDirs, null); + final List> callables = Lists.newArrayList(); final List dirs = bpStorage.recoverTransitionRead( - nsInfo, bpDataDirs, startOpt, datanode.getConf()); - for(StorageDirectory sd : dirs) { - success.add(sd); + nsInfo, bpDataDirs, startOpt, callables, datanode.getConf()); + if (callables.isEmpty()) { + for(StorageDirectory sd : dirs) { + success.add(sd); + } + } else { + for(Callable c : callables) { + tasks.add(new UpgradeTask(dataDir, executor.submit(c))); + } } } catch (IOException e) { LOG.warn("Failed to add storage directory " + dataDir @@ -411,6 +474,20 @@ public class DataStorage extends Storage { } } + if (!tasks.isEmpty()) { + LOG.info("loadBlockPoolSliceStorage: " + tasks.size() + " upgrade tasks"); + for(UpgradeTask t : tasks) { + try { + success.add(t.future.get()); + } catch (ExecutionException e) { + LOG.warn("Failed to upgrade storage directory " + t.dataDir + + " for block pool " + bpid, e); + } catch (InterruptedException e) { + throw DFSUtilClient.toInterruptedIOException("Task interrupted", e); + } + } + } + return success; } @@ -655,7 +732,8 @@ public class DataStorage extends Storage { * @return true if the new properties has been written. */ private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, - StartupOption startOpt, Configuration conf) throws IOException { + StartupOption startOpt, List> callables, + Configuration conf) throws IOException { if (startOpt == StartupOption.ROLLBACK) { doRollback(sd, nsInfo); // rollback if applicable } @@ -697,7 +775,7 @@ public class DataStorage extends Storage { // simply update the properties. upgradeProperties(sd); } else { - doUpgradePreFederation(sd, nsInfo, conf); + doUpgradePreFederation(sd, nsInfo, callables, conf); } return true; // doUgrade already has written properties } @@ -734,7 +812,9 @@ public class DataStorage extends Storage { * @param sd storage directory */ void doUpgradePreFederation(final StorageDirectory sd, - final NamespaceInfo nsInfo, final Configuration conf) throws IOException { + final NamespaceInfo nsInfo, + final List> callables, + final Configuration conf) throws IOException { final int oldLV = getLayoutVersion(); LOG.info("Upgrading storage directory " + sd.getRoot() + ".\n old LV = " + oldLV @@ -767,10 +847,20 @@ public class DataStorage extends Storage { bpStorage.format(curDir, nsInfo); final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT); - doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); + if (callables == null) { + doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); + } else { + callables.add(new Callable() { + @Override + public StorageDirectory call() throws Exception { + doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); + return sd; + } + }); + } } - private void doUgrade(final StorageDirectory sd, + private void doUpgrade(final StorageDirectory sd, final NamespaceInfo nsInfo, final File prevDir, final File tmpDir, final File bbwDir, final File toDir, final int oldLV, Configuration conf) throws IOException {