From bfd3f8bd8a9ae2186ec3e4addc71f912ec7b8923 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Sat, 30 Sep 2017 06:31:52 -0700 Subject: [PATCH] HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 22 +- .../BlockStorageMovementAttemptedItems.java | 8 +- .../namenode/BlockStorageMovementNeeded.java | 277 ++++++++++--- .../server/namenode/ReencryptionHandler.java | 1 + .../namenode/StoragePolicySatisfier.java | 43 +- .../src/main/resources/hdfs-default.xml | 23 ++ .../src/site/markdown/ArchivalStorage.md | 3 +- ...estBlockStorageMovementAttemptedItems.java | 2 +- .../TestPersistentStoragePolicySatisfier.java | 8 +- .../namenode/TestStoragePolicySatisfier.java | 377 +++++++++++++++++- 11 files changed, 689 insertions(+), 83 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e66806f643d..c90ca3387a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -618,6 +618,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.storage.policy.satisfier.enabled"; public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT = false; + public static final String DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY = + "dfs.storage.policy.satisfier.queue.limit"; + public static final int DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT = + 1000; + public static final String DFS_SPS_WORK_MULTIPLIER_PER_ITERATION = + "dfs.storage.policy.satisfier.work.multiplier.per.iteration"; + public static final int DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT = + 1; public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY = "dfs.storage.policy.satisfier.recheck.timeout.millis"; public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index f5ceeafc5d1..c26599cd878 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -1457,7 +1457,27 @@ public static int getReplWorkMultiplier(Configuration conf) { "It should be a positive, non-zero integer value."); return blocksReplWorkMultiplier; } - + + /** + * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from + * configuration. + * + * @param conf Configuration + * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + */ + public static int getSPSWorkMultiplier(Configuration conf) { + int spsWorkMultiplier = conf + .getInt( + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION, + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT); + Preconditions.checkArgument( + (spsWorkMultiplier > 0), + DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION + + " = '" + spsWorkMultiplier + "' is invalid. " + + "It should be a positive, non-zero integer value."); + return spsWorkMultiplier; + } + /** * Get SPNEGO keytab Key from configuration * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index 278b62b47e1..549819fabed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -101,7 +101,7 @@ public BlockStorageMovementAttemptedItems(long recheckTimeout, public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) { synchronized (storageMovementAttemptedItems) { AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo( - itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(), + itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(), allBlockLocsAttemptedToSatisfy); storageMovementAttemptedItems.put(itemInfo.getTrackId(), attemptedItemInfo); @@ -260,7 +260,7 @@ void blocksStorageMovementUnReportedItemsCheck() { synchronized (storageMovementAttemptedResults) { if (!isExistInResult(blockCollectionID)) { ItemInfo candidate = new ItemInfo( - itemInfo.getRootId(), blockCollectionID); + itemInfo.getStartId(), blockCollectionID); blockStorageMovementNeeded.add(candidate); iter.remove(); LOG.info("TrackID: {} becomes timed out and moved to needed " @@ -315,7 +315,7 @@ void blockStorageMovementResultCheck() throws IOException { // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning // the xAttr ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null) - ? attemptedItemInfo.getRootId() : trackId, trackId); + ? attemptedItemInfo.getStartId() : trackId, trackId); switch (status) { case FAILURE: if (attemptedItemInfo != null) { @@ -345,7 +345,7 @@ void blockStorageMovementResultCheck() throws IOException { if (attemptedItemInfo != null) { if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) { blockStorageMovementNeeded - .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId)); + .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId)); LOG.warn("{} But adding trackID back to retry queue as some of" + " the blocks couldn't find matching target nodes in" + " previous SPS iteration.", msg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java index 41a3a6c3fe1..788a98b40b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java @@ -29,12 +29,15 @@ import java.util.Queue; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * A Class to track the block collection IDs (Inode's ID) for which physical * storage movement needed as per the Namespace and StorageReports from DN. @@ -53,11 +56,11 @@ public class BlockStorageMovementNeeded { new LinkedList(); /** - * Map of rootId and number of child's. Number of child's indicate the number - * of files pending to satisfy the policy. + * Map of startId and number of child's. Number of child's indicate the + * number of files pending to satisfy the policy. */ - private final Map pendingWorkForDirectory = - new HashMap(); + private final Map pendingWorkForDirectory = + new HashMap(); private final Namesystem namesystem; @@ -66,12 +69,15 @@ public class BlockStorageMovementNeeded { private final StoragePolicySatisfier sps; - private Daemon fileInodeIdCollector; + private Daemon inodeIdCollector; + + private final int maxQueuedItem; public BlockStorageMovementNeeded(Namesystem namesystem, - StoragePolicySatisfier sps) { + StoragePolicySatisfier sps, int queueLimit) { this.namesystem = namesystem; this.sps = sps; + this.maxQueuedItem = queueLimit; } /** @@ -88,15 +94,24 @@ public synchronized void add(ItemInfo trackInfo) { /** * Add the itemInfo to tracking list for which storage movement * expected if necessary. - * @param rootId - * - root inode id + * @param startId + * - start id * @param itemInfoList * - List of child in the directory */ - private synchronized void addAll(Long rootId, - List itemInfoList) { + @VisibleForTesting + public synchronized void addAll(long startId, + List itemInfoList, boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); - pendingWorkForDirectory.put(rootId, itemInfoList.size()); + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + if (pendingWork == null) { + pendingWork = new DirPendingWorkInfo(); + pendingWorkForDirectory.put(startId, pendingWork); + } + pendingWork.addPendingWorkCount(itemInfoList.size()); + if (scanCompleted) { + pendingWork.markScanCompleted(); + } } /** @@ -118,6 +133,25 @@ public synchronized void addToPendingDirQueue(long id) { } } + /** + * Returns queue remaining capacity. + */ + public synchronized int remainingCapacity() { + int size = storageMovementNeeded.size(); + if (size >= maxQueuedItem) { + return 0; + } else { + return (maxQueuedItem - size); + } + } + + /** + * Returns queue size. + */ + public synchronized int size() { + return storageMovementNeeded.size(); + } + public synchronized void clearAll() { spsDirsToBeTraveresed.clear(); storageMovementNeeded.clear(); @@ -131,20 +165,20 @@ public synchronized void clearAll() { public synchronized void removeItemTrackInfo(ItemInfo trackInfo) throws IOException { if (trackInfo.isDir()) { - // If track is part of some root then reduce the pending directory work - // count. - long rootId = trackInfo.getRootId(); - INode inode = namesystem.getFSDirectory().getInode(rootId); + // If track is part of some start inode then reduce the pending + // directory work count. + long startId = trackInfo.getStartId(); + INode inode = namesystem.getFSDirectory().getInode(startId); if (inode == null) { // directory deleted just remove it. - this.pendingWorkForDirectory.remove(rootId); + this.pendingWorkForDirectory.remove(startId); } else { - if (pendingWorkForDirectory.get(rootId) != null) { - Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1; - pendingWorkForDirectory.put(rootId, pendingWork); - if (pendingWork <= 0) { - namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY); - pendingWorkForDirectory.remove(rootId); + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + if (pendingWork != null) { + pendingWork.decrementPendingWorkCount(); + if (pendingWork.isDirWorkDone()) { + namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY); + pendingWorkForDirectory.remove(startId); } } } @@ -161,7 +195,7 @@ public synchronized void clearQueue(long trackId) { Iterator iterator = storageMovementNeeded.iterator(); while (iterator.hasNext()) { ItemInfo next = iterator.next(); - if (next.getRootId() == trackId) { + if (next.getStartId() == trackId) { iterator.remove(); } } @@ -208,7 +242,17 @@ public synchronized void clearQueuesWithNotification() { * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child * ID's to process for satisfy the policy. */ - private class FileInodeIdCollector implements Runnable { + private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser + implements Runnable { + + private int remainingCapacity = 0; + + private List currentBatch = new ArrayList<>(maxQueuedItem); + + StorageMovementPendingInodeIdCollector(FSDirectory dir) { + super(dir); + } + @Override public void run() { LOG.info("Starting FileInodeIdCollector!."); @@ -216,38 +260,36 @@ public void run() { try { if (!namesystem.isInSafeMode()) { FSDirectory fsd = namesystem.getFSDirectory(); - Long rootINodeId = spsDirsToBeTraveresed.poll(); - if (rootINodeId == null) { + Long startINodeId = spsDirsToBeTraveresed.poll(); + if (startINodeId == null) { // Waiting for SPS path synchronized (spsDirsToBeTraveresed) { spsDirsToBeTraveresed.wait(5000); } } else { - INode rootInode = fsd.getInode(rootINodeId); - if (rootInode != null) { - // TODO : HDFS-12291 - // 1. Implement an efficient recursive directory iteration - // mechanism and satisfies storage policy for all the files - // under the given directory. - // 2. Process files in batches,so datanodes workload can be - // handled. - List itemInfoList = - new ArrayList<>(); - for (INode childInode : rootInode.asDirectory() - .getChildrenList(Snapshot.CURRENT_STATE_ID)) { - if (childInode.isFile() - && childInode.asFile().numBlocks() != 0) { - itemInfoList.add( - new ItemInfo(rootINodeId, childInode.getId())); - } + INode startInode = fsd.getInode(startINodeId); + if (startInode != null) { + try { + remainingCapacity = remainingCapacity(); + readLock(); + traverseDir(startInode.asDirectory(), startINodeId, + HdfsFileStatus.EMPTY_NAME, + new SPSTraverseInfo(startINodeId)); + } finally { + readUnlock(); } - if (itemInfoList.isEmpty()) { - // satisfy track info is empty, so remove the xAttr from the - // directory - namesystem.removeXattr(rootINodeId, + // Mark startInode traverse is done + addAll(startInode.getId(), currentBatch, true); + currentBatch.clear(); + + // check if directory was empty and no child added to queue + DirPendingWorkInfo dirPendingWorkInfo = + pendingWorkForDirectory.get(startInode.getId()); + if (dirPendingWorkInfo.isDirWorkDone()) { + namesystem.removeXattr(startInode.getId(), XATTR_SATISFY_STORAGE_POLICY); + pendingWorkForDirectory.remove(startInode.getId()); } - addAll(rootINodeId, itemInfoList); } } } @@ -256,17 +298,140 @@ public void run() { } } } + + @Override + protected void checkPauseForTesting() throws InterruptedException { + // TODO implement if needed + } + + @Override + protected boolean processFileInode(INode inode, TraverseInfo traverseInfo) + throws IOException, InterruptedException { + assert getFSDirectory().hasReadLock(); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing {} for statisy the policy", + inode.getFullPathName()); + } + if (!inode.isFile()) { + return false; + } + if (inode.isFile() && inode.asFile().numBlocks() != 0) { + currentBatch.add(new ItemInfo( + ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); + remainingCapacity--; + } + return true; + } + + @Override + protected boolean canSubmitCurrentBatch() { + return remainingCapacity <= 0; + } + + @Override + protected void checkINodeReady(long startId) throws IOException { + FSNamesystem fsn = ((FSNamesystem) namesystem); + fsn.checkNameNodeSafeMode("NN is in safe mode," + + "cannot satisfy the policy."); + // SPS work should be cancelled when NN goes to standby. Just + // double checking for sanity. + fsn.checkOperation(NameNode.OperationCategory.WRITE); + } + + @Override + protected void submitCurrentBatch(long startId) + throws IOException, InterruptedException { + // Add current child's to queue + addAll(startId, currentBatch, false); + currentBatch.clear(); + } + + @Override + protected void throttle() throws InterruptedException { + assert !getFSDirectory().hasReadLock(); + assert !namesystem.hasReadLock(); + if (LOG.isDebugEnabled()) { + LOG.debug("StorageMovementNeeded queue remaining capacity is zero," + + " waiting for some free slots."); + } + remainingCapacity = remainingCapacity(); + // wait for queue to be free + while (remainingCapacity <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for storageMovementNeeded queue to be free!"); + } + Thread.sleep(5000); + remainingCapacity = remainingCapacity(); + } + } + + @Override + protected boolean canTraverseDir(INode inode) throws IOException { + return true; + } } - public void start() { - fileInodeIdCollector = new Daemon(new FileInodeIdCollector()); - fileInodeIdCollector.setName("FileInodeIdCollector"); - fileInodeIdCollector.start(); + /** + * Info for directory recursive scan. + */ + public static class DirPendingWorkInfo { + + private int pendingWorkCount = 0; + private boolean fullyScanned = false; + + /** + * Increment the pending work count for directory. + */ + public synchronized void addPendingWorkCount(int count) { + this.pendingWorkCount = this.pendingWorkCount + count; + } + + /** + * Decrement the pending work count for directory one track info is + * completed. + */ + public synchronized void decrementPendingWorkCount() { + this.pendingWorkCount--; + } + + /** + * Return true if all the pending work is done and directory fully + * scanned, otherwise false. + */ + public synchronized boolean isDirWorkDone() { + return (pendingWorkCount <= 0 && fullyScanned); + } + + /** + * Mark directory scan is completed. + */ + public synchronized void markScanCompleted() { + this.fullyScanned = true; + } } - public void stop() { - if (fileInodeIdCollector != null) { - fileInodeIdCollector.interrupt(); + public void init() { + inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector( + namesystem.getFSDirectory())); + inodeIdCollector.setName("FileInodeIdCollector"); + inodeIdCollector.start(); + } + + public void close() { + if (inodeIdCollector != null) { + inodeIdCollector.interrupt(); + } + } + + class SPSTraverseInfo extends TraverseInfo { + private long startId; + + SPSTraverseInfo(long startId) { + this.startId = startId; + } + + public long getStartId() { + return startId; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java index b92fe9f38b4..feacd74eb6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 48d0598cf31..a4372d5cc67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -77,7 +77,8 @@ public class StoragePolicySatisfier implements Runnable { private final BlockStorageMovementNeeded storageMovementNeeded; private final BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; - + private int spsWorkMultiplier; + private long blockCount = 0L; /** * Represents the collective analysis status for all blocks. */ @@ -106,7 +107,9 @@ public StoragePolicySatisfier(final Namesystem namesystem, final BlockManager blkManager, Configuration conf) { this.namesystem = namesystem; this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem, - this); + this, conf.getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT)); this.blockManager = blkManager; this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( conf.getLong( @@ -117,6 +120,7 @@ public StoragePolicySatisfier(final Namesystem namesystem, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT), storageMovementNeeded, this); + this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf); } /** @@ -143,7 +147,7 @@ public synchronized void start(boolean reconfigStart) { // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); - storageMovementNeeded.start(); + storageMovementNeeded.init(); storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); @@ -164,7 +168,7 @@ public synchronized void disable(boolean forceStop) { return; } - storageMovementNeeded.stop(); + storageMovementNeeded.close(); storagePolicySatisfierThread.interrupt(); this.storageMovementsMonitor.stop(); @@ -268,9 +272,13 @@ public void run() { } } } - // TODO: We can think to make this as configurable later, how frequently - // we want to check block movements. - Thread.sleep(3000); + int numLiveDn = namesystem.getFSDirectory().getBlockManager() + .getDatanodeManager().getNumLiveDataNodes(); + if (storageMovementNeeded.size() == 0 + || blockCount > (numLiveDn * spsWorkMultiplier)) { + Thread.sleep(3000); + blockCount = 0L; + } } catch (Throwable t) { handleException(t); } @@ -380,6 +388,11 @@ private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN( assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(), blockMovingInfos, coordinatorNode); + int count = 0; + for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { + count = count + blkMovingInfo.getSources().length; + } + blockCount = blockCount + count; return status; } @@ -840,7 +853,7 @@ public void clearQueues() { * - file inode/blockcollection id. */ public void satisfyStoragePolicy(Long inodeId) { - //For file rootId and trackId is same + //For file startId and trackId is same storageMovementNeeded.add(new ItemInfo(inodeId, inodeId)); if (LOG.isDebugEnabled()) { LOG.debug("Added track info for inode {} to block " @@ -864,19 +877,19 @@ public void clearQueue(long trackId) { * policy. */ public static class ItemInfo { - private long rootId; + private long startId; private long trackId; - public ItemInfo(long rootId, long trackId) { - this.rootId = rootId; + public ItemInfo(long startId, long trackId) { + this.startId = startId; this.trackId = trackId; } /** - * Return the root of the current track Id. + * Return the start inode id of the current track Id. */ - public long getRootId() { - return rootId; + public long getStartId() { + return startId; } /** @@ -890,7 +903,7 @@ public long getTrackId() { * Returns true if the tracking path is a directory, false otherwise. */ public boolean isDir() { - return (rootId != trackId); + return (startId != trackId); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 17f7795b0f4..41a74a78433 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4509,6 +4509,29 @@ + + dfs.storage.policy.satisfier.queue.limit + 1000 + + Storage policy satisfier queue size. This queue contains the currently + scheduled file's inode ID for statisfy the policy. + Default value is 1000. + + + + + dfs.storage.policy.satisfier.work.multiplier.per.iteration + 1 + + *Note*: Advanced property. Change with caution. + This determines the total amount of block transfers to begin in + one iteration, for satisfy the policy. The actual number is obtained by + multiplying this multiplier with the total number of live nodes in the + cluster. The result number is the number of blocks to begin transfers + immediately. This number can be any positive, non-zero integer. + + + dfs.storage.policy.satisfier.recheck.timeout.millis 300000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md index f6bbd10348f..c8a946656f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md @@ -112,7 +112,7 @@ SPS can be enabled and disabled dynamically without restarting the Namenode. Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285) -* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will consider the files which are immediate to that directory. Sub-directories won't be considered for satisfying the policy. Its user responsibility to call this API on directories recursively, to track all files under the sub tree. +* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will scan all sub-directories and consider all the files for satisfy the policy.. * HdfsAdmin API : `public void satisfyStoragePolicy(final Path path) throws IOException` @@ -214,7 +214,6 @@ Get the storage policy of a file or a directory. ### Satisfy Storage Policy Schedule blocks to move based on file's/directory's current storage policy. -Note: For directory case, it will consider immediate files under that directory and it will not consider sub directories recursively. * Command: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java index 55ebf9c3e1d..79188217055 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java @@ -41,7 +41,7 @@ public class TestBlockStorageMovementAttemptedItems { public void setup() throws Exception { unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded( Mockito.mock(Namesystem.class), - Mockito.mock(StoragePolicySatisfier.class)); + Mockito.mock(StoragePolicySatisfier.class), 100); StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, selfRetryTimeout, unsatisfiedStorageMovementFiles, sps); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java index e7b91484edd..5bce296b862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -191,7 +191,7 @@ public void testWithCheckpoint() throws Exception { DFSTestUtil.waitExpectedStorageType( parentFileName, StorageType.ARCHIVE, 3, timeout, fs); DFSTestUtil.waitExpectedStorageType( - childFileName, StorageType.DEFAULT, 3, timeout, fs); + childFileName, StorageType.ARCHIVE, 3, timeout, fs); } finally { clusterShutdown(); @@ -232,7 +232,9 @@ public void testWithHA() throws Exception { DFSTestUtil.waitExpectedStorageType( parentFileName, StorageType.ARCHIVE, 2, timeout, fs); DFSTestUtil.waitExpectedStorageType( - childFileName, StorageType.DEFAULT, 3, timeout, fs); + childFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.ARCHIVE, 2, timeout, fs); } finally { clusterShutdown(); } @@ -269,7 +271,7 @@ public void testWithRestarts() throws Exception { DFSTestUtil.waitExpectedStorageType( parentFileName, StorageType.ARCHIVE, 3, timeout, fs); DFSTestUtil.waitExpectedStorageType( - childFileName, StorageType.DEFAULT, 3, timeout, fs); + childFileName, StorageType.ARCHIVE, 3, timeout, fs); } finally { clusterShutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 33755901742..57e9f94455e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -21,6 +21,9 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.slf4j.LoggerFactory.getLogger; import java.io.FileNotFoundException; import java.io.IOException; @@ -61,8 +64,10 @@ import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; import com.google.common.base.Supplier; @@ -71,6 +76,12 @@ * moved and finding its suggested target locations to move. */ public class TestStoragePolicySatisfier { + + { + GenericTestUtils.setLogLevel( + getLogger(FSTreeTraverser.class), Level.DEBUG); + } + private static final String ONE_SSD = "ONE_SSD"; private static final String COLD = "COLD"; private static final Logger LOG = @@ -341,7 +352,9 @@ public void testSatisfyDirWithHdfsAdmin() throws Exception { // take no effect for the sub-dir's file in the directory. DFSTestUtil.waitExpectedStorageType( - subFile2, StorageType.DEFAULT, 3, 30000, dfs); + subFile2, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + subFile2, StorageType.DISK, 2, 30000, dfs); } finally { shutdownCluster(); } @@ -1083,6 +1096,368 @@ public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception { } } + /** + * Test SPS for empty directory, xAttr should be removed. + */ + @Test(timeout = 300000) + public void testSPSForEmptyDirectory() throws IOException, TimeoutException, + InterruptedException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path emptyDir = new Path("/emptyDir"); + fs.mkdirs(emptyDir); + fs.satisfyStoragePolicy(emptyDir); + // Make sure satisfy xattr has been removed. + DFSTestUtil.waitForXattrRemoved("/emptyDir", + XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test SPS for not exist directory. + */ + @Test(timeout = 300000) + public void testSPSForNonExistDirectory() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path emptyDir = new Path("/emptyDir"); + try { + fs.satisfyStoragePolicy(emptyDir); + fail("FileNotFoundException should throw"); + } catch (FileNotFoundException e) { + // nothing to do + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test SPS for directory tree which doesn't have files. + */ + @Test(timeout = 300000) + public void testSPSWithDirectoryTreeWithoutFile() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + // Create directories + /* + * root + * | + * A--------C--------D + * | + * G----H----I + * | + * O + */ + DistributedFileSystem fs = cluster.getFileSystem(); + fs.mkdirs(new Path("/root/C/H/O")); + fs.mkdirs(new Path("/root/A")); + fs.mkdirs(new Path("/root/D")); + fs.mkdirs(new Path("/root/C/G")); + fs.mkdirs(new Path("/root/C/I")); + fs.satisfyStoragePolicy(new Path("/root")); + // Make sure satisfy xattr has been removed. + DFSTestUtil.waitForXattrRemoved("/root", + XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test SPS for directory which has multilevel directories. + */ + @Test(timeout = 300000) + public void testMultipleLevelDirectoryForSatisfyStoragePolicy() + throws Exception { + try { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + + List files = getDFSListOfTree(); + dfs.setStoragePolicy(new Path("/root"), COLD); + dfs.satisfyStoragePolicy(new Path("/root")); + for (String fileName : files) { + // Wait till the block is moved to ARCHIVE + DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, + 30000, dfs); + } + } finally { + shutdownCluster(); + } + } + + /** + * Test SPS for batch processing. + */ + @Test(timeout = 300000) + public void testBatchProcessingForSPSDirectory() throws Exception { + try { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + // Set queue max capacity + config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + 5); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + List files = getDFSListOfTree(); + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory + .getLog(FSTreeTraverser.class)); + + dfs.setStoragePolicy(new Path("/root"), COLD); + dfs.satisfyStoragePolicy(new Path("/root")); + for (String fileName : files) { + // Wait till the block is moved to ARCHIVE + DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, + 30000, dfs); + } + waitForBlocksMovementResult(files.size(), 30000); + String expectedLogMessage = "StorageMovementNeeded queue remaining" + + " capacity is zero"; + assertTrue("Log output does not contain expected log message: " + + expectedLogMessage, logs.getOutput().contains(expectedLogMessage)); + } finally { + shutdownCluster(); + } + } + + + /** + * Test traverse when parent got deleted. + * 1. Delete /root when traversing Q + * 2. U, R, S should not be in queued. + */ + @Test + public void testTraverseWhenParentDeleted() throws Exception { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + + List expectedTraverseOrder = getDFSListOfTree(); + + //Remove files which will not be traverse when parent is deleted + expectedTraverseOrder.remove("/root/D/L/R"); + expectedTraverseOrder.remove("/root/D/L/S"); + expectedTraverseOrder.remove("/root/D/L/Q/U"); + FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory(); + + //Queue limit can control the traverse logic to wait for some free + //entry in queue. After 10 files, traverse control will be on U. + StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); + Mockito.when(sps.isRunning()).thenReturn(true); + BlockStorageMovementNeeded movmentNeededQueue = + new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); + INode rootINode = fsDir.getINode("/root"); + movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); + movmentNeededQueue.init(); + + //Wait for thread to reach U. + Thread.sleep(1000); + + dfs.delete(new Path("/root/D/L"), true); + + // Remove 10 element and make queue free, So other traversing will start. + for (int i = 0; i < 10; i++) { + String path = expectedTraverseOrder.remove(0); + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + //Wait to finish tree traverse + Thread.sleep(5000); + + // Check other element traversed in order and R,S should not be added in + // queue which we already removed from expected list + for (String path : expectedTraverseOrder) { + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + dfs.delete(new Path("/root"), true); + } + + /** + * Test traverse when root parent got deleted. + * 1. Delete L when traversing Q + * 2. E, M, U, R, S should not be in queued. + */ + @Test + public void testTraverseWhenRootParentDeleted() throws Exception { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + + List expectedTraverseOrder = getDFSListOfTree(); + + // Remove files which will not be traverse when parent is deleted + expectedTraverseOrder.remove("/root/D/L/R"); + expectedTraverseOrder.remove("/root/D/L/S"); + expectedTraverseOrder.remove("/root/D/L/Q/U"); + expectedTraverseOrder.remove("/root/D/M"); + expectedTraverseOrder.remove("/root/E"); + FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory(); + StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); + Mockito.when(sps.isRunning()).thenReturn(true); + // Queue limit can control the traverse logic to wait for some free + // entry in queue. After 10 files, traverse control will be on U. + BlockStorageMovementNeeded movmentNeededQueue = + new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); + movmentNeededQueue.init(); + INode rootINode = fsDir.getINode("/root"); + movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); + // Wait for thread to reach U. + Thread.sleep(1000); + + dfs.delete(new Path("/root/D/L"), true); + + // Remove 10 element and make queue free, So other traversing will start. + for (int i = 0; i < 10; i++) { + String path = expectedTraverseOrder.remove(0); + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + // Wait to finish tree traverse + Thread.sleep(5000); + + // Check other element traversed in order and E, M, U, R, S should not be + // added in queue which we already removed from expected list + for (String path : expectedTraverseOrder) { + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + dfs.delete(new Path("/root"), true); + } + + private static void createDirectoryTree(DistributedFileSystem dfs) + throws Exception { + // tree structure + /* + * root + * | + * A--------B--------C--------D--------E + * | | + * F----G----H----I J----K----L----M + * | | + * N----O----P Q----R----S + * | | + * T U + */ + // create root Node and child + dfs.mkdirs(new Path("/root")); + DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/B")); + DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/D")); + DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0); + + // Create /root/B child + DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/B/G")); + DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0); + + // Create /root/D child + DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/D/L")); + DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0); + + // Create /root/B/G child + DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/B/G/P")); + + // Create /root/D/L child + dfs.mkdirs(new Path("/root/D/L/Q")); + DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0); + + // Create /root/B/G/P child + DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0); + + // Create /root/D/L/Q child + DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0); + } + + private List getDFSListOfTree() { + List dfsList = new ArrayList<>(); + dfsList.add("/root/A"); + dfsList.add("/root/B/F"); + dfsList.add("/root/B/G/N"); + dfsList.add("/root/B/G/O"); + dfsList.add("/root/B/G/P/T"); + dfsList.add("/root/B/H"); + dfsList.add("/root/B/I"); + dfsList.add("/root/C"); + dfsList.add("/root/D/J"); + dfsList.add("/root/D/K"); + dfsList.add("/root/D/L/Q/U"); + dfsList.add("/root/D/L/R"); + dfsList.add("/root/D/L/S"); + dfsList.add("/root/D/M"); + dfsList.add("/root/E"); + return dfsList; + } + private String createFileAndSimulateFavoredNodes(int favoredNodesCount) throws IOException { ArrayList dns = hdfsCluster.getDataNodes();