From 662e17b46a0f41ade6a304e12925b70b5d09fc2f Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 28 Jan 2016 10:56:01 +0800 Subject: [PATCH] HDFS-9654. Code refactoring for HDFS-8578. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/server/common/Storage.java | 3 +- .../datanode/BlockPoolSliceStorage.java | 131 ++++---- .../hdfs/server/datanode/DataStorage.java | 282 ++++++++++-------- .../hdfs/server/datanode/StorageLocation.java | 15 + .../apache/hadoop/hdfs/TestReplication.java | 3 +- .../apache/hadoop/hdfs/UpgradeUtilities.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 2 +- .../datanode/TestDataNodeHotSwapVolumes.java | 48 ++- .../hdfs/server/datanode/TestDataStorage.java | 7 +- .../fsdataset/impl/TestFsDatasetImpl.java | 2 +- 11 files changed, 297 insertions(+), 200 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7e755586c92..a51dc155c9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2665,6 +2665,8 @@ Release 2.7.3 - UNRELEASED HDFS-9634. webhdfs client side exceptions don't provide enough details (Eric Payne via kihwal) + HDFS-9654. Code refactoring for HDFS-8578. (szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 7b4b571baeb..41719b94c77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -640,7 +640,8 @@ public void doRecover(StorageState curState) throws IOException { rename(getLastCheckpointTmp(), curDir); return; default: - throw new IOException("Unexpected FS state: " + curState); + throw new IOException("Unexpected FS state: " + curState + + " for storage directory: " + rootPath); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index 1bbeeee1345..acf10f1f52a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -18,10 +18,21 @@ package org.apache.hadoop.hdfs.server.datanode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.LayoutVersion; @@ -34,18 +45,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.util.Daemon; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * Manages storage for the set of BlockPoolSlices which share a particular @@ -136,15 +138,15 @@ public void addStorageDir(StorageDirectory sd) { /** * Load one storage directory. Recover from previous transitions if required. * - * @param datanode datanode instance * @param nsInfo namespace information * @param dataDir the root path of the storage directory * @param startOpt startup option * @return the StorageDirectory successfully loaded. * @throws IOException */ - private StorageDirectory loadStorageDirectory(DataNode datanode, - NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException { + private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo, + File dataDir, StartupOption startOpt, Configuration conf) + throws IOException { StorageDirectory sd = new StorageDirectory(dataDir, null, true); try { StorageState curState = sd.analyzeStorage(startOpt, this); @@ -158,8 +160,8 @@ private StorageDirectory loadStorageDirectory(DataNode datanode, + " does not exist"); case NOT_FORMATTED: // format LOG.info("Block pool storage directory " + dataDir - + " is not formatted for " + nsInfo.getBlockPoolID()); - LOG.info("Formatting ..."); + + " is not formatted for " + nsInfo.getBlockPoolID() + + ". Formatting ..."); format(sd, nsInfo); break; default: // recovery part is common @@ -170,10 +172,13 @@ private StorageDirectory loadStorageDirectory(DataNode datanode, // Each storage directory is treated individually. // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. - doTransition(datanode, sd, nsInfo, startOpt); + if (doTransition(sd, nsInfo, startOpt, conf)) { + return sd; + } + if (getCTime() != nsInfo.getCTime()) { - throw new IOException( - "Data-node and name-node CTimes must be the same."); + throw new IOException("Datanode CTime (=" + getCTime() + + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")"); } // 3. Update successfully loaded storage. @@ -195,16 +200,15 @@ private StorageDirectory loadStorageDirectory(DataNode datanode, * Therefore, a failure on loading any block pool storage results a faulty * data volume. * - * @param datanode Datanode to which this storage belongs to * @param nsInfo namespace information * @param dataDirs storage directories of block pool * @param startOpt startup option * @return an array of loaded block pool directories. * @throws IOException on error */ - List loadBpStorageDirectories( - DataNode datanode, NamespaceInfo nsInfo, - Collection dataDirs, StartupOption startOpt) throws IOException { + List loadBpStorageDirectories(NamespaceInfo nsInfo, + Collection dataDirs, StartupOption startOpt, + Configuration conf) throws IOException { List succeedDirs = Lists.newArrayList(); try { for (File dataDir : dataDirs) { @@ -213,8 +217,8 @@ List loadBpStorageDirectories( "BlockPoolSliceStorage.recoverTransitionRead: " + "attempt to load an used block storage: " + dataDir); } - StorageDirectory sd = - loadStorageDirectory(datanode, nsInfo, dataDir, startOpt); + final StorageDirectory sd = loadStorageDirectory( + nsInfo, dataDir, startOpt, conf); succeedDirs.add(sd); } } catch (IOException e) { @@ -232,19 +236,21 @@ List loadBpStorageDirectories( * Therefore, a failure on loading any block pool storage results a faulty * data volume. * - * @param datanode Datanode to which this storage belongs to * @param nsInfo namespace information * @param dataDirs storage directories of block pool * @param startOpt startup option * @throws IOException on error */ - void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, - Collection dataDirs, StartupOption startOpt) throws IOException { + List recoverTransitionRead(NamespaceInfo nsInfo, + Collection dataDirs, StartupOption startOpt, Configuration conf) + throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); - for (StorageDirectory sd : loadBpStorageDirectories( - datanode, nsInfo, dataDirs, startOpt)) { + final List loaded = loadBpStorageDirectories( + nsInfo, dataDirs, startOpt, conf); + for (StorageDirectory sd : loaded) { addStorageDir(sd); } + return loaded; } /** @@ -344,10 +350,10 @@ protected void setFieldsFromProperties(Properties props, StorageDirectory sd) * @param sd storage directory /current/ * @param nsInfo namespace info * @param startOpt startup option - * @throws IOException + * @return true if the new properties has been written. */ - private void doTransition(DataNode datanode, StorageDirectory sd, - NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { + private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, + StartupOption startOpt, Configuration conf) throws IOException { if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { Preconditions.checkState(!getTrashRootDir(sd).exists(), sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + @@ -379,7 +385,7 @@ private void doTransition(DataNode datanode, StorageDirectory sd, } if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION && this.cTime == nsInfo.getCTime()) { - return; // regular startup + return false; // regular startup } if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) { int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd)); @@ -389,8 +395,8 @@ private void doTransition(DataNode datanode, StorageDirectory sd, } if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION || this.cTime < nsInfo.getCTime()) { - doUpgrade(datanode, sd, nsInfo); // upgrade - return; + doUpgrade(sd, nsInfo, conf); // upgrade + return true; } // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime // must shutdown @@ -418,16 +424,18 @@ private void doTransition(DataNode datanode, StorageDirectory sd, * @param nsInfo Namespace Info from the namenode * @throws IOException on error */ - void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo) - throws IOException { + private void doUpgrade(final StorageDirectory bpSd, + final NamespaceInfo nsInfo, final Configuration conf) throws IOException { // Upgrading is applicable only to release with federation or after if (!DataNodeLayoutVersion.supports( LayoutVersion.Feature.FEDERATION, layoutVersion)) { return; } + final int oldLV = getLayoutVersion(); LOG.info("Upgrading block pool storage directory " + bpSd.getRoot() - + ".\n old LV = " + this.getLayoutVersion() + "; old CTime = " - + this.getCTime() + ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION + + ".\n old LV = " + oldLV + + "; old CTime = " + this.getCTime() + + ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION + "; new CTime = " + nsInfo.getCTime()); // get /previous directory String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath()); @@ -438,8 +446,8 @@ void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo) if (dnPrevDir.exists()) { deleteDir(dnPrevDir); } - File bpCurDir = bpSd.getCurrentDir(); - File bpPrevDir = bpSd.getPreviousDir(); + final File bpCurDir = bpSd.getCurrentDir(); + final File bpPrevDir = bpSd.getPreviousDir(); assert bpCurDir.exists() : "BP level current directory must exist."; cleanupDetachDir(new File(bpCurDir, DataStorage.STORAGE_DIR_DETACHED)); @@ -447,15 +455,23 @@ void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo) if (bpPrevDir.exists()) { deleteDir(bpPrevDir); } - File bpTmpDir = bpSd.getPreviousTmp(); + final File bpTmpDir = bpSd.getPreviousTmp(); assert !bpTmpDir.exists() : "previous.tmp directory must not exist."; // 2. Rename /current//current to // /current//previous.tmp rename(bpCurDir, bpTmpDir); + final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot(); + doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf); + } + + private void doUgrade(String name, final StorageDirectory bpSd, + NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir, + final File bpCurDir, final int oldLV, Configuration conf) + throws IOException { // 3. Create new /current with block files hardlinks and VERSION - linkAllBlocks(datanode, bpTmpDir, bpCurDir); + linkAllBlocks(bpTmpDir, bpCurDir, oldLV, conf); this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION; assert this.namespaceID == nsInfo.getNamespaceID() : "Data-node and name-node layout versions must be the same."; @@ -465,8 +481,7 @@ void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo) // 4.rename /current//previous.tmp to // /current//previous rename(bpTmpDir, bpPrevDir); - LOG.info("Upgrade of block pool " + blockpoolID + " at " + bpSd.getRoot() - + " is complete"); + LOG.info("Upgrade of " + name + " is complete"); } /** @@ -640,17 +655,17 @@ public String toString() { * @param toDir the current data directory * @throws IOException if error occurs during hardlink */ - private void linkAllBlocks(DataNode datanode, File fromDir, File toDir) - throws IOException { + private static void linkAllBlocks(File fromDir, File toDir, + int diskLayoutVersion, Configuration conf) throws IOException { // do the link - int diskLayoutVersion = this.getLayoutVersion(); // hardlink finalized blocks in tmpDir HardLink hardLink = new HardLink(); - DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), - new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); - DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW), - new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink); - LOG.info( hardLink.linkStats.report() ); + DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_FINALIZED, + diskLayoutVersion, hardLink, conf); + DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_RBW, + diskLayoutVersion, hardLink, conf); + LOG.info("Linked blocks from " + fromDir + " to " + toDir + ". " + + hardLink.linkStats.report()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 707f1a58d91..57bb8b21412 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -18,11 +18,29 @@ package org.apache.hadoop.hdfs.server.datanode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.Futures; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -47,28 +65,11 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DiskChecker; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; /** * Data storage information file. @@ -104,7 +105,7 @@ public class DataStorage extends Storage { * upgraded from a pre-UUID version. For compatibility with prior * versions of Datanodes we cannot make this field a UUID. */ - private String datanodeUuid = null; + private volatile String datanodeUuid = null; // Maps block pool IDs to block pool storage private final Map bpStorageMap @@ -125,18 +126,28 @@ public DataStorage(StorageInfo storageInfo) { super(storageInfo); } - public synchronized String getDatanodeUuid() { + public String getDatanodeUuid() { return datanodeUuid; } - public synchronized void setDatanodeUuid(String newDatanodeUuid) { + public void setDatanodeUuid(String newDatanodeUuid) { this.datanodeUuid = newDatanodeUuid; } + private static boolean createStorageID(StorageDirectory sd, int lv) { + // Clusters previously upgraded from layout versions earlier than + // ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a + // new storage ID. We check for that and fix it now. + final boolean haveValidStorageId = DataNodeLayoutVersion.supports( + LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv) + && DatanodeStorage.isValidStorageId(sd.getStorageUuid()); + return createStorageID(sd, !haveValidStorageId); + } + /** Create an ID for this storage. * @return true if a new storage ID was generated. * */ - public synchronized boolean createStorageID( + public static boolean createStorageID( StorageDirectory sd, boolean regenerateStorageIds) { final String oldStorageID = sd.getStorageUuid(); if (oldStorageID == null || regenerateStorageIds) { @@ -250,7 +261,7 @@ public void build() { private StorageDirectory loadStorageDirectory(DataNode datanode, NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) - throws IOException { + throws IOException { StorageDirectory sd = new StorageDirectory(dataDir, null, false); try { StorageState curState = sd.analyzeStorage(startOpt, this); @@ -263,9 +274,9 @@ private StorageDirectory loadStorageDirectory(DataNode datanode, throw new IOException("Storage directory " + dataDir + " does not exist"); case NOT_FORMATTED: // format - LOG.info("Storage directory " + dataDir + " is not formatted for " - + nsInfo.getBlockPoolID()); - LOG.info("Formatting ..."); + LOG.info("Storage directory " + dataDir + + " is not formatted for namespace " + nsInfo.getNamespaceID() + + ". Formatting..."); format(sd, nsInfo, datanode.getDatanodeUuid()); break; default: // recovery part is common @@ -276,7 +287,9 @@ private StorageDirectory loadStorageDirectory(DataNode datanode, // Each storage directory is treated individually. // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. - doTransition(datanode, sd, nsInfo, startOpt); + if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) { + return sd; + } // 3. Update successfully loaded storage. setServiceLayoutVersion(getServiceLayoutVersion()); @@ -321,20 +334,10 @@ public VolumeBuilder prepareVolume(DataNode datanode, File volume, nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT))); makeBlockPoolDataDir(bpDataDirs, null); - BlockPoolSliceStorage bpStorage; - final String bpid = nsInfo.getBlockPoolID(); - synchronized (this) { - bpStorage = this.bpStorageMap.get(bpid); - if (bpStorage == null) { - bpStorage = new BlockPoolSliceStorage( - nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), - nsInfo.getClusterID()); - addBlockPoolStorage(bpid, bpStorage); - } - } - builder.addBpStorageDirectories( - bpid, bpStorage.loadBpStorageDirectories( - datanode, nsInfo, bpDataDirs, StartupOption.HOTSWAP)); + final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); + final List dirs = bpStorage.loadBpStorageDirectories( + nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf()); + builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs); } return builder; } @@ -347,53 +350,68 @@ public VolumeBuilder prepareVolume(DataNode datanode, File volume, * @param nsInfo namespace information * @param dataDirs array of data storage directories * @param startOpt startup option - * @return a list of successfully loaded volumes. - * @throws IOException + * @return a list of successfully loaded storage directories. */ @VisibleForTesting - synchronized List addStorageLocations(DataNode datanode, + synchronized List addStorageLocations(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { - final String bpid = nsInfo.getBlockPoolID(); - List successVolumes = Lists.newArrayList(); + final List successLocations = loadDataStorage( + datanode, nsInfo, dataDirs, startOpt); + return loadBlockPoolSliceStorage( + datanode, nsInfo, successLocations, startOpt); + } + + private List loadDataStorage(DataNode datanode, + NamespaceInfo nsInfo, Collection dataDirs, + StartupOption startOpt) throws IOException { + final List success = Lists.newArrayList(); for (StorageLocation dataDir : dataDirs) { File root = dataDir.getFile(); if (!containsStorageDir(root)) { try { // It first ensures the datanode level format is completed. - StorageDirectory sd = loadStorageDirectory( + final StorageDirectory sd = loadStorageDirectory( datanode, nsInfo, root, startOpt); addStorageDir(sd); } catch (IOException e) { - LOG.warn(e); + LOG.warn("Failed to add storage directory " + dataDir, e); continue; } } else { LOG.info("Storage directory " + dataDir + " has already been used."); } + success.add(dataDir); + } + return success; + } + + private List loadBlockPoolSliceStorage(DataNode datanode, + NamespaceInfo nsInfo, Collection dataDirs, + StartupOption startOpt) throws IOException { + final String bpid = nsInfo.getBlockPoolID(); + final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); + final List success = Lists.newArrayList(); + for (StorageLocation dataDir : dataDirs) { + final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT); List bpDataDirs = new ArrayList(); - bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, new File(root, - STORAGE_DIR_CURRENT))); + bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, curDir)); try { makeBlockPoolDataDir(bpDataDirs, null); - BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); - if (bpStorage == null) { - bpStorage = new BlockPoolSliceStorage( - nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), - nsInfo.getClusterID()); - } - bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); - addBlockPoolStorage(bpid, bpStorage); + final List dirs = bpStorage.recoverTransitionRead( + nsInfo, bpDataDirs, startOpt, datanode.getConf()); + for(StorageDirectory sd : dirs) { + success.add(sd); + } } catch (IOException e) { - LOG.warn("Failed to add storage for block pool: " + bpid + " : " - + e.getMessage()); - continue; + LOG.warn("Failed to add storage directory " + dataDir + + " for block pool " + bpid, e); } - successVolumes.add(dataDir); } - return successVolumes; + + return success; } /** @@ -635,17 +653,13 @@ void readProperties(StorageDirectory sd, int rollbackLayoutVersion) * Upgrade if this.LV > LAYOUT_VERSION * Regular startup if this.LV = LAYOUT_VERSION * - * @param datanode Datanode to which this storage belongs to * @param sd storage directory * @param nsInfo namespace info * @param startOpt startup option - * @throws IOException + * @return true if the new properties has been written. */ - private void doTransition( DataNode datanode, - StorageDirectory sd, - NamespaceInfo nsInfo, - StartupOption startOpt - ) throws IOException { + private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, + StartupOption startOpt, Configuration conf) throws IOException { if (startOpt == StartupOption.ROLLBACK) { doRollback(sd, nsInfo); // rollback if applicable } @@ -674,25 +688,16 @@ private void doTransition( DataNode datanode, + nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID()); } - // Clusters previously upgraded from layout versions earlier than - // ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a - // new storage ID. We check for that and fix it now. - boolean haveValidStorageId = - DataNodeLayoutVersion.supports( - LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, layoutVersion) && - DatanodeStorage.isValidStorageId(sd.getStorageUuid()); - // regular start up. if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) { - createStorageID(sd, !haveValidStorageId); - return; // regular startup + createStorageID(sd, layoutVersion); + return false; // need to write properties } // do upgrade if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) { - doUpgrade(datanode, sd, nsInfo); // upgrade - createStorageID(sd, !haveValidStorageId); - return; + doUpgrade(sd, nsInfo, conf); // upgrade + return true; // doUgrade already has written properties } // layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer @@ -726,8 +731,8 @@ private void doTransition( DataNode datanode, * @param sd storage directory * @throws IOException on error */ - void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) - throws IOException { + void doUpgrade(final StorageDirectory sd, final NamespaceInfo nsInfo, + final Configuration conf) throws IOException { // If the existing on-disk layout version supportes federation, simply // update its layout version. if (DataNodeLayoutVersion.supports( @@ -743,15 +748,16 @@ void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) return; } + final int oldLV = getLayoutVersion(); LOG.info("Upgrading storage directory " + sd.getRoot() - + ".\n old LV = " + this.getLayoutVersion() + + ".\n old LV = " + oldLV + "; old CTime = " + this.getCTime() + ".\n new LV = " + HdfsServerConstants.DATANODE_LAYOUT_VERSION + "; new CTime = " + nsInfo.getCTime()); - File curDir = sd.getCurrentDir(); - File prevDir = sd.getPreviousDir(); - File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW); + final File curDir = sd.getCurrentDir(); + final File prevDir = sd.getPreviousDir(); + final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW); assert curDir.exists() : "Data node current directory must exist."; // Cleanup directory "detach" @@ -761,21 +767,29 @@ void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) if (prevDir.exists()) deleteDir(prevDir); // get previous.tmp directory, /previous.tmp - File tmpDir = sd.getPreviousTmp(); + final File tmpDir = sd.getPreviousTmp(); assert !tmpDir.exists() : "Data node previous.tmp directory must not exist."; // 2. Rename /current to /previous.tmp rename(curDir, tmpDir); - // 3. Format BP and hard link blocks from previous directory + // 3.1. Format BP File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir); - BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), - nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID()); + BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); bpStorage.format(curDir, nsInfo); - linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir, - STORAGE_DIR_CURRENT)); - + + final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT); + doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); + } + + private void doUgrade(final StorageDirectory sd, + final NamespaceInfo nsInfo, final File prevDir, + final File tmpDir, final File bbwDir, final File toDir, final int oldLV, + Configuration conf) throws IOException { + // 3.2. Link block files from /previous.tmp to /current + linkAllBlocks(tmpDir, bbwDir, toDir, oldLV, conf); + // 4. Write version file under /current layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION; clusterID = nsInfo.getClusterID(); @@ -784,7 +798,8 @@ void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) // 5. Rename /previous.tmp to /previous rename(tmpDir, prevDir); LOG.info("Upgrade of " + sd.getRoot()+ " is complete"); - addBlockPoolStorage(nsInfo.getBlockPoolID(), bpStorage); + + createStorageID(sd, layoutVersion); } /** @@ -952,23 +967,22 @@ void finalizeUpgrade(String bpID) throws IOException { * * @throws IOException If error occurs during hardlink */ - private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir, - File toDir) throws IOException { + private static void linkAllBlocks(File fromDir, File fromBbwDir, File toDir, + int diskLayoutVersion, Configuration conf) throws IOException { HardLink hardLink = new HardLink(); // do the link - int diskLayoutVersion = this.getLayoutVersion(); if (DataNodeLayoutVersion.supports( LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) { // hardlink finalized blocks in tmpDir/finalized - linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED), - new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); + linkBlocks(fromDir, toDir, STORAGE_DIR_FINALIZED, + diskLayoutVersion, hardLink, conf); // hardlink rbw blocks in tmpDir/rbw - linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW), - new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); + linkBlocks(fromDir, toDir, STORAGE_DIR_RBW, + diskLayoutVersion, hardLink, conf); } else { // pre-RBW version // hardlink finalized blocks in tmpDir - linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED), - diskLayoutVersion, hardLink); + linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), + diskLayoutVersion, hardLink, conf); if (fromBbwDir.exists()) { /* * We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw @@ -976,11 +990,12 @@ private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir, * NOT underneath the 'current' directory in those releases. See * HDFS-3731 for details. */ - linkBlocks(datanode, fromBbwDir, - new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); + linkBlocks(fromBbwDir, new File(toDir, STORAGE_DIR_RBW), + diskLayoutVersion, hardLink, conf); } - } - LOG.info( hardLink.linkStats.report() ); + } + LOG.info("Linked blocks from " + fromDir + " to " + toDir + ". " + + hardLink.linkStats.report()); } private static class LinkArgs { @@ -993,8 +1008,15 @@ private static class LinkArgs { } } - static void linkBlocks(DataNode datanode, File from, File to, int oldLV, - HardLink hl) throws IOException { + static void linkBlocks(File fromDir, File toDir, String subdir, int oldLV, + HardLink hl, Configuration conf) throws IOException { + linkBlocks(new File(fromDir, subdir), new File(toDir, subdir), + oldLV, hl, conf); + } + + private static void linkBlocks(File from, File to, int oldLV, + HardLink hl, Configuration conf) throws IOException { + LOG.info("Start linking block files from " + from + " to " + to); boolean upgradeToIdBasedLayout = false; // If we are upgrading from a version older than the one where we introduced // block ID-based layout AND we're working with the finalized directory, @@ -1017,7 +1039,7 @@ static void linkBlocks(DataNode datanode, File from, File to, int oldLV, removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates); } - int numLinkWorkers = datanode.getConf().getInt( + final int numLinkWorkers = conf.getInt( DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY, DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS); ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers); @@ -1268,13 +1290,19 @@ public boolean accept(File dir, String name) { } /** - * Add bpStorage into bpStorageMap + * Get the BlockPoolSliceStorage from {@link bpStorageMap}. + * If the object is not found, create a new object and put it to the map. */ - private void addBlockPoolStorage(String bpID, BlockPoolSliceStorage bpStorage - ) { - if (!this.bpStorageMap.containsKey(bpID)) { - this.bpStorageMap.put(bpID, bpStorage); + synchronized BlockPoolSliceStorage getBlockPoolSliceStorage( + final NamespaceInfo nsInfo) { + final String bpid = nsInfo.getBlockPoolID(); + BlockPoolSliceStorage bpStorage = bpStorageMap.get(bpid); + if (bpStorage == null) { + bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), bpid, + nsInfo.getCTime(), nsInfo.getClusterID()); + bpStorageMap.put(bpid, bpStorage); } + return bpStorage; } synchronized void removeBlockPoolStorage(String bpId) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java index 46e8e8a844a..3162c5c6c9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java @@ -101,4 +101,19 @@ public static StorageLocation parse(String rawLocation) public String toString() { return "[" + storageType + "]" + file.toURI(); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj == null || !(obj instanceof StorageLocation)) { + return false; + } + return toString().equals(obj.toString()); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java index d9c96ab4ddd..ca1092d84ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java @@ -508,7 +508,8 @@ public void testReplicationWhenBlockCorruption() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setLong( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .storagesPerDatanode(1).build(); FileSystem fs = cluster.getFileSystem(); Path filePath = new Path("/test"); FSDataOutputStream create = fs.create(filePath); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java index 9ac58ba65b5..138cc9610e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java @@ -477,7 +477,7 @@ public static void createDataNodeVersionFile(File[] parent, for (int i = 0; i < parent.length; i++) { File versionFile = new File(parent[i], "VERSION"); StorageDirectory sd = new StorageDirectory(parent[i].getParentFile()); - storage.createStorageID(sd, false); + DataStorage.createStorageID(sd, false); storage.writeProperties(versionFile, sd); versionFiles[i] = versionFile; File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 7a10379304e..c843938e2ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -537,7 +537,7 @@ public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration this.datanode = datanode; if (storage != null) { for (int i = 0; i < storage.getNumStorageDirs(); ++i) { - storage.createStorageID(storage.getStorageDir(i), false); + DataStorage.createStorageID(storage.getStorageDir(i), false); } this.datanodeUuid = storage.getDatanodeUuid(); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index a396b0e411a..212d2e6ec60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -51,10 +51,12 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Random; @@ -288,15 +290,27 @@ private void addVolumes(int numNewVolumes) String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(","); String[] expectDataDirs = newDataDir.split(","); assertEquals(expectDataDirs.length, effectiveDataDirs.length); + List expectedStorageLocations = new ArrayList<>(); + List effectiveStorageLocations = new ArrayList<>(); for (int i = 0; i < expectDataDirs.length; i++) { StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]); - StorageLocation effectiveLocation = - StorageLocation.parse(effectiveDataDirs[i]); - assertEquals(expectLocation.getStorageType(), - effectiveLocation.getStorageType()); - assertEquals(expectLocation.getFile().getCanonicalFile(), - effectiveLocation.getFile().getCanonicalFile()); + StorageLocation effectiveLocation = StorageLocation + .parse(effectiveDataDirs[i]); + expectedStorageLocations.add(expectLocation); + effectiveStorageLocations.add(effectiveLocation); } + Comparator comparator = new Comparator() { + + @Override + public int compare(StorageLocation o1, StorageLocation o2) { + return o1.toString().compareTo(o2.toString()); + } + + }; + Collections.sort(expectedStorageLocations, comparator); + Collections.sort(effectiveStorageLocations, comparator); + assertEquals("Effective volumes doesnt match expected", + expectedStorageLocations, effectiveStorageLocations); // Check that all newly created volumes are appropriately formatted. for (File volumeDir : newVolumeDirs) { @@ -473,11 +487,27 @@ public void testReplicatingAfterRemoveVolume() DataNode dn = cluster.getDataNodes().get(0); Collection oldDirs = getDataDirs(dn); - String newDirs = oldDirs.iterator().next(); // Keep the first volume. + // Findout the storage with block and remove it + ExtendedBlock block = + DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock(); + FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block); + String basePath = volumeWithBlock.getBasePath(); + File storageDir = new File(basePath); + URI fileUri = storageDir.toURI(); + String dirWithBlock = + "[" + volumeWithBlock.getStorageType() + "]" + fileUri; + String newDirs = dirWithBlock; + for (String dir : oldDirs) { + if (dirWithBlock.startsWith(dir)) { + continue; + } + newDirs = dir; + break; + } dn.reconfigurePropertyImpl( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); - assertFileLocksReleased( - new ArrayList(oldDirs).subList(1, oldDirs.size())); + oldDirs.remove(newDirs); + assertFileLocksReleased(oldDirs); triggerDeleteReport(dn); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java index c90b8e5d4f4..c55dbae7f65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; @@ -54,11 +57,13 @@ public class TestDataStorage { @Before public void setUp() throws IOException { + Configuration conf = new HdfsConfiguration(); storage = new DataStorage(); nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME, BUILD_VERSION, SOFTWARE_VERSION); FileUtil.fullyDelete(TEST_DIR); assertTrue("Failed to make test dir.", TEST_DIR.mkdirs()); + Mockito.when(mockDN.getConf()).thenReturn(conf); } @After @@ -146,7 +151,7 @@ public void testAddStorageDirectories() throws IOException, assertEquals(numLocations, storage.getNumStorageDirs()); locations = createStorageLocations(numLocations); - List addedLocation = + List addedLocation = storage.addStorageLocations(mockDN, namespaceInfos.get(0), locations, START_OPT); assertTrue(addedLocation.isEmpty()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index cdc1d6183b8..261a8b0ed0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -110,7 +110,7 @@ public class TestFsDatasetImpl { private static Storage.StorageDirectory createStorageDirectory(File root) { Storage.StorageDirectory sd = new Storage.StorageDirectory(root); - dsForStorageUuid.createStorageID(sd, false); + DataStorage.createStorageID(sd, false); return sd; }