diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index e4283c6ca15..d9367fb2263 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -265,7 +265,7 @@ public class NamenodeProtocolServerSideTranslatorPB implements RpcController controller, GetNextSPSPathRequestProto request) throws ServiceException { try { - String nextSPSPath = impl.getNextSPSPath(); + Long nextSPSPath = impl.getNextSPSPath(); if (nextSPSPath == null) { return GetNextSPSPathResponseProto.newBuilder().build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 97dee9b3f3c..3bd5986de62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -267,7 +267,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, } @Override - public String getNextSPSPath() throws IOException { + public Long getNextSPSPath() throws IOException { GetNextSPSPathRequestProto req = GetNextSPSPathRequestProto.newBuilder().build(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 42e246cf9c8..bae6b4e3b34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3897,7 +3897,7 @@ public class BlockManager implements BlockStatsMXBean { private void notifyStorageMovementAttemptFinishedBlk( DatanodeStorageInfo storageInfo, Block block) { if (getSPSManager() != null) { - SPSService sps = getSPSManager().getInternalSPSService(); + SPSService sps = getSPSManager().getInternalSPSService(); if (sps.isRunning()) { sps.notifyStorageMovementAttemptFinishedBlk( storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e1ceecda2e8..afe90923bcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3202,17 +3202,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return stat; } - @Override - public String getFilePath(Long inodeId) { - readLock(); - try { - INode inode = getFSDirectory().getInode(inodeId); - return inode == null ? null : inode.getFullPathName(); - } finally { - readUnlock(); - } - } - /** * Returns true if the file is closed */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 57e827d08fc..2f3325f8235 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2569,7 +2569,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override - public String getNextSPSPath() throws IOException { + public Long getNextSPSPath() throws IOException { checkNNStartup(); String operationName = "getNextSPSPath"; namesystem.checkSuperuserPrivilege(operationName); @@ -2589,10 +2589,6 @@ public class NameNodeRpcServer implements NamenodeProtocols { throw new IOException("SPS service mode is " + spsMode + ", so " + "external SPS service is not allowed to fetch the path Ids"); } - Long pathId = spsMgr.getNextPathId(); - if (pathId == null) { - return null; - } - return namesystem.getFilePath(pathId); + return namesystem.getBlockManager().getSPSManager().getNextPathId(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index fc933b76e9c..82af4d234b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -77,13 +77,4 @@ public interface Namesystem extends RwLock, SafeMode { */ HdfsFileStatus getFileInfo(String filePath, boolean resolveLink, boolean needLocation) throws IOException; - - /** - * Gets the file path corresponds to the given file id. - * - * @param inodeId - * file id - * @return string file path - */ - String getFilePath(Long inodeId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index 5b25491eff2..df4f0dddb49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -52,13 +52,8 @@ import com.google.common.annotations.VisibleForTesting; * entries from tracking. If there is no DN reports about movement attempt * finished for a longer time period, then such items will retries automatically * after timeout. The default timeout would be 5 minutes. - * - * @param - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ -public class BlockStorageMovementAttemptedItems { +public class BlockStorageMovementAttemptedItems { private static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); @@ -66,14 +61,14 @@ public class BlockStorageMovementAttemptedItems { * A map holds the items which are already taken for blocks movements * processing and sent to DNs. */ - private final List> storageMovementAttemptedItems; + private final List storageMovementAttemptedItems; private Map> scheduledBlkLocs; // Maintains separate Queue to keep the movement finished blocks. This Q // is used to update the storageMovementAttemptedItems list asynchronously. private final BlockingQueue movementFinishedBlocks; private volatile boolean monitorRunning = true; private Daemon timerThread = null; - private BlockMovementListener blkMovementListener; + private final Context context; // // It might take anywhere between 5 to 10 minutes before // a request is timed out. @@ -85,12 +80,12 @@ public class BlockStorageMovementAttemptedItems { // a request is timed out. // private long minCheckTimeout = 1 * 60 * 1000; // minimum value - private BlockStorageMovementNeeded blockStorageMovementNeeded; - private final SPSService service; + private BlockStorageMovementNeeded blockStorageMovementNeeded; + private final SPSService service; - public BlockStorageMovementAttemptedItems(SPSService service, - BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, - BlockMovementListener blockMovementListener) { + public BlockStorageMovementAttemptedItems(SPSService service, + BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, + Context context) { this.service = service; long recheckTimeout = this.service.getConf().getLong( DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, @@ -106,19 +101,27 @@ public class BlockStorageMovementAttemptedItems { storageMovementAttemptedItems = new ArrayList<>(); scheduledBlkLocs = new HashMap<>(); movementFinishedBlocks = new LinkedBlockingQueue<>(); - this.blkMovementListener = blockMovementListener; + this.context = context; } /** * Add item to block storage movement attempted items map which holds the * tracking/blockCollection id versus time stamp. * - * @param itemInfo - * - tracking info + * @param startPathId + * - start satisfier path identifier + * @param fileId + * - file identifier + * @param monotonicNow + * - time now + * @param assignedBlocks + * - assigned blocks for block movement + * @param retryCount + * - retry count */ - public void add(T startPath, T file, long monotonicNow, + public void add(long startPathId, long fileId, long monotonicNow, Map> assignedBlocks, int retryCount) { - AttemptedItemInfo itemInfo = new AttemptedItemInfo(startPath, file, + AttemptedItemInfo itemInfo = new AttemptedItemInfo(startPathId, fileId, monotonicNow, assignedBlocks.keySet(), retryCount); synchronized (storageMovementAttemptedItems) { storageMovementAttemptedItems.add(itemInfo); @@ -161,11 +164,9 @@ public class BlockStorageMovementAttemptedItems { boolean foundType = dn.getStorageType().equals(type); if (foundDn && foundType) { blkLocs.remove(dn); - // listener if it is plugged-in - if (blkMovementListener != null) { - blkMovementListener - .notifyMovementTriedBlocks(new Block[] {reportedBlock}); - } + Block[] mFinishedBlocks = new Block[1]; + mFinishedBlocks[0] = reportedBlock; + context.notifyMovementTriedBlocks(mFinishedBlocks); // All the block locations has reported. if (blkLocs.size() <= 0) { movementFinishedBlocks.add(reportedBlock); @@ -244,15 +245,15 @@ public class BlockStorageMovementAttemptedItems { @VisibleForTesting void blocksStorageMovementUnReportedItemsCheck() { synchronized (storageMovementAttemptedItems) { - Iterator> iter = storageMovementAttemptedItems + Iterator iter = storageMovementAttemptedItems .iterator(); long now = monotonicNow(); while (iter.hasNext()) { - AttemptedItemInfo itemInfo = iter.next(); + AttemptedItemInfo itemInfo = iter.next(); if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { - T file = itemInfo.getFile(); - ItemInfo candidate = new ItemInfo(itemInfo.getStartPath(), file, + long file = itemInfo.getFile(); + ItemInfo candidate = new ItemInfo(itemInfo.getStartPath(), file, itemInfo.getRetryCount() + 1); blockStorageMovementNeeded.add(candidate); iter.remove(); @@ -272,13 +273,13 @@ public class BlockStorageMovementAttemptedItems { // Update attempted items list for (Block blk : finishedBlks) { synchronized (storageMovementAttemptedItems) { - Iterator> iterator = storageMovementAttemptedItems + Iterator iterator = storageMovementAttemptedItems .iterator(); while (iterator.hasNext()) { - AttemptedItemInfo attemptedItemInfo = iterator.next(); + AttemptedItemInfo attemptedItemInfo = iterator.next(); attemptedItemInfo.getBlocks().remove(blk); if (attemptedItemInfo.getBlocks().isEmpty()) { - blockStorageMovementNeeded.add(new ItemInfo( + blockStorageMovementNeeded.add(new ItemInfo( attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(), attemptedItemInfo.getRetryCount() + 1)); iterator.remove(); @@ -309,15 +310,4 @@ public class BlockStorageMovementAttemptedItems { scheduledBlkLocs.clear(); } } - - /** - * Sets external listener for testing. - * - * @param blkMoveListener - * block movement listener callback object - */ - @VisibleForTesting - void setBlockMovementListener(BlockMovementListener blkMoveListener) { - this.blkMovementListener = blkMoveListener; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index a194876c76c..c95dcda9793 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -43,47 +43,38 @@ import com.google.common.annotations.VisibleForTesting; * schedule the block collection IDs for movement. It track the info of * scheduled items and remove the SPS xAttr from the file/Directory once * movement is success. - * - * @param - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private -public class BlockStorageMovementNeeded { +public class BlockStorageMovementNeeded { public static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementNeeded.class); - private final Queue> storageMovementNeeded = - new LinkedList>(); + private final Queue storageMovementNeeded = + new LinkedList(); /** * Map of startPath and number of child's. Number of child's indicate the * number of files pending to satisfy the policy. */ - private final Map pendingWorkForDirectory = + private final Map pendingWorkForDirectory = new HashMap<>(); - private final Map spsStatus = + private final Map spsStatus = new ConcurrentHashMap<>(); - private final Context ctxt; + private final Context ctxt; private Daemon pathIdCollector; - private FileCollector fileCollector; - private SPSPathIdProcessor pathIDProcessor; // Amount of time to cache the SUCCESS status of path before turning it to // NOT_AVAILABLE. private static long statusClearanceElapsedTimeMs = 300000; - public BlockStorageMovementNeeded(Context context, - FileCollector fileCollector) { + public BlockStorageMovementNeeded(Context context) { this.ctxt = context; - this.fileCollector = fileCollector; pathIDProcessor = new SPSPathIdProcessor(); } @@ -94,7 +85,7 @@ public class BlockStorageMovementNeeded { * @param trackInfo * - track info for satisfy the policy */ - public synchronized void add(ItemInfo trackInfo) { + public synchronized void add(ItemInfo trackInfo) { spsStatus.put(trackInfo.getFile(), new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); @@ -114,7 +105,7 @@ public class BlockStorageMovementNeeded { * scan. */ @VisibleForTesting - public synchronized void addAll(T startPath, List> itemInfoList, + public synchronized void addAll(long startPath, List itemInfoList, boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted); @@ -131,7 +122,7 @@ public class BlockStorageMovementNeeded { * elements to scan. */ @VisibleForTesting - public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { + public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { storageMovementNeeded.add(itemInfo); // This represents sps start id is file, so no need to update pending dir // stats. @@ -141,7 +132,7 @@ public class BlockStorageMovementNeeded { updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted); } - private void updatePendingDirScanStats(T startPath, int numScannedFiles, + private void updatePendingDirScanStats(long startPath, int numScannedFiles, boolean scanCompleted) { DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath); if (pendingWork == null) { @@ -160,7 +151,7 @@ public class BlockStorageMovementNeeded { * * @return satisfier files */ - public synchronized ItemInfo get() { + public synchronized ItemInfo get() { return storageMovementNeeded.poll(); } @@ -181,12 +172,12 @@ public class BlockStorageMovementNeeded { * Decrease the pending child count for directory once one file blocks moved * successfully. Remove the SPS xAttr if pending child count is zero. */ - public synchronized void removeItemTrackInfo(ItemInfo trackInfo, + public synchronized void removeItemTrackInfo(ItemInfo trackInfo, boolean isSuccess) throws IOException { if (trackInfo.isDir()) { // If track is part of some start inode then reduce the pending // directory work count. - T startId = trackInfo.getStartPath(); + long startId = trackInfo.getStartPath(); if (!ctxt.isFileExist(startId)) { // directory deleted just remove it. this.pendingWorkForDirectory.remove(startId); @@ -212,11 +203,11 @@ public class BlockStorageMovementNeeded { } } - public synchronized void clearQueue(T trackId) { + public synchronized void clearQueue(long trackId) { ctxt.removeSPSPathId(trackId); - Iterator> iterator = storageMovementNeeded.iterator(); + Iterator iterator = storageMovementNeeded.iterator(); while (iterator.hasNext()) { - ItemInfo next = iterator.next(); + ItemInfo next = iterator.next(); if (next.getFile() == trackId) { iterator.remove(); } @@ -227,7 +218,7 @@ public class BlockStorageMovementNeeded { /** * Mark inode status as SUCCESS in map. */ - private void updateStatus(T startId, boolean isSuccess){ + private void updateStatus(long startId, boolean isSuccess){ StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(startId); if (spsStatusInfo == null) { @@ -249,7 +240,7 @@ public class BlockStorageMovementNeeded { */ public synchronized void clearQueuesWithNotification() { // Remove xAttr from directories - T trackId; + Long trackId; while ((trackId = ctxt.getNextSPSPath()) != null) { try { // Remove xAttr for file @@ -261,7 +252,7 @@ public class BlockStorageMovementNeeded { // File's directly added to storageMovementNeeded, So try to remove // xAttr for file - ItemInfo itemInfo; + ItemInfo itemInfo; while ((itemInfo = get()) != null) { try { // Remove xAttr for file @@ -287,7 +278,7 @@ public class BlockStorageMovementNeeded { public void run() { LOG.info("Starting SPSPathIdProcessor!."); long lastStatusCleanTime = 0; - T startINode = null; + Long startINode = null; while (ctxt.isRunning()) { try { if (!ctxt.isInSafeMode()) { @@ -301,7 +292,7 @@ public class BlockStorageMovementNeeded { spsStatus.put(startINode, new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); - fileCollector.scanAndCollectFiles(startINode); + ctxt.scanAndCollectFiles(startINode); // check if directory was empty and no child added to queue DirPendingWorkInfo dirPendingWorkInfo = pendingWorkForDirectory.get(startINode); @@ -339,9 +330,9 @@ public class BlockStorageMovementNeeded { } private synchronized void cleanSPSStatus() { - for (Iterator> it = spsStatus - .entrySet().iterator(); it.hasNext();) { - Entry entry = it.next(); + for (Iterator> it = + spsStatus.entrySet().iterator(); it.hasNext();) { + Entry entry = it.next(); if (entry.getValue().canRemove()) { it.remove(); } @@ -477,7 +468,7 @@ public class BlockStorageMovementNeeded { return statusClearanceElapsedTimeMs; } - public void markScanCompletedForDir(T inode) { + public void markScanCompletedForDir(long inode) { DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode); if (pendingWork != null) { pendingWork.markScanCompleted(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index 55a1f7a08a2..d538374241c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -24,24 +24,21 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; /** * An interface for the communication between SPS and Namenode module. - * - * @param - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface Context { +public interface Context { /** * Returns true if the SPS is running, false otherwise. @@ -85,7 +82,7 @@ public interface Context { * - file info * @return true if the given file exists, false otherwise. */ - boolean isFileExist(T filePath); + boolean isFileExist(long filePath); /** * Gets the storage policy details for the given policy ID. @@ -108,7 +105,7 @@ public interface Context { * - user invoked satisfier path * @throws IOException */ - void removeSPSHint(T spsPath) throws IOException; + void removeSPSHint(long spsPath) throws IOException; /** * Gets the number of live datanodes in the cluster. @@ -124,7 +121,7 @@ public interface Context { * file path * @return file status metadata information */ - HdfsFileStatus getFileInfo(T file) throws IOException; + HdfsFileStatus getFileInfo(long file) throws IOException; /** * Returns all the live datanodes and its storage details. @@ -137,15 +134,41 @@ public interface Context { /** * @return next SPS path info to process. */ - T getNextSPSPath(); + Long getNextSPSPath(); /** * Removes the SPS path id. */ - void removeSPSPathId(T pathId); + void removeSPSPathId(long pathId); /** * Removes all SPS path ids. */ void removeAllSPSPathIds(); + + /** + * Do scan and collects the files under that directory and adds to the given + * BlockStorageMovementNeeded. + * + * @param filePath + * file path + */ + void scanAndCollectFiles(long filePath) + throws IOException, InterruptedException; + + /** + * Handles the block move tasks. BlockMovingInfo must contain the required + * info to move the block, that source location, destination location and + * storage types. + */ + void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException; + + /** + * This can be used to notify to the SPS about block movement attempt + * finished. Then SPS will re-check whether it needs retry or not. + * + * @param moveAttemptFinishedBlks + * list of movement attempt finished blocks + */ + void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java index 3531ecde7bd..d4e514b8132 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; * interval. */ @InterfaceAudience.Private -public class DatanodeCacheManager { +public class DatanodeCacheManager { private static final Logger LOG = LoggerFactory .getLogger(DatanodeCacheManager.class); @@ -78,7 +78,7 @@ public class DatanodeCacheManager { * @throws IOException */ public DatanodeMap getLiveDatanodeStorageReport( - Context spsContext) throws IOException { + Context spsContext) throws IOException { long now = Time.monotonicNow(); long elapsedTimeMs = now - lastAccessedTime; boolean refreshNeeded = elapsedTimeMs >= refreshIntervalMs; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java index dceb5fa9dec..fa8b31b5492 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java @@ -26,23 +26,18 @@ import org.apache.hadoop.classification.InterfaceStability; /** * An interface for scanning the directory recursively and collect files * under the given directory. - * - * @param - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface FileCollector { +public interface FileCollector { /** * This method can be used to scan and collects the files under that * directory and adds to the given BlockStorageMovementNeeded. * - * @param filePath - * - file path + * @param path + * - file path id */ - void scanAndCollectFiles(T filePath) + void scanAndCollectFiles(long path) throws IOException, InterruptedException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index a77fe855449..2bf48103081 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.server.namenode.sps; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import java.io.IOException; +import java.util.Arrays; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -32,6 +35,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.net.NetworkTopology; @@ -45,20 +49,26 @@ import org.slf4j.LoggerFactory; * movements to satisfy the storage policy. */ @InterfaceAudience.Private -public class IntraSPSNameNodeContext implements Context { +public class IntraSPSNameNodeContext implements Context { private static final Logger LOG = LoggerFactory .getLogger(IntraSPSNameNodeContext.class); private final Namesystem namesystem; private final BlockManager blockManager; - private SPSService service; + private SPSService service; + private final FileCollector fileCollector; + private final BlockMoveTaskHandler blockMoveTaskHandler; public IntraSPSNameNodeContext(Namesystem namesystem, - BlockManager blockManager, SPSService service) { + BlockManager blockManager, SPSService service) { this.namesystem = namesystem; this.blockManager = blockManager; this.service = service; + fileCollector = new IntraSPSNameNodeFileIdCollector( + namesystem.getFSDirectory(), service); + blockMoveTaskHandler = new IntraSPSNameNodeBlockMoveTaskHandler( + blockManager, namesystem); } @Override @@ -67,17 +77,12 @@ public class IntraSPSNameNodeContext implements Context { } /** - * @return object containing information regarding the file or null if file - * not found. + * @return object containing information regarding the file. */ @Override - public HdfsFileStatus getFileInfo(Long inodeID) throws IOException { - String filePath = namesystem.getFilePath(inodeID); - if (StringUtils.isBlank(filePath)) { - LOG.debug("File with inodeID:{} doesn't exists!", inodeID); - return null; - } - return namesystem.getFileInfo(filePath, true, true); + public HdfsFileStatus getFileInfo(long inodeID) throws IOException { + Path filePath = DFSUtilClient.makePathFromFileId(inodeID); + return namesystem.getFileInfo(filePath.toString(), true, true); } @Override @@ -93,12 +98,12 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public boolean isFileExist(Long inodeId) { + public boolean isFileExist(long inodeId) { return namesystem.getFSDirectory().getInode(inodeId) != null; } @Override - public void removeSPSHint(Long inodeId) throws IOException { + public void removeSPSHint(long inodeId) throws IOException { this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY); } @@ -156,7 +161,7 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public void removeSPSPathId(Long trackId) { + public void removeSPSPathId(long trackId) { blockManager.getSPSManager().removePathId(trackId); } @@ -164,4 +169,21 @@ public class IntraSPSNameNodeContext implements Context { public void removeAllSPSPathIds() { blockManager.getSPSManager().removeAllPathIds(); } + + @Override + public void scanAndCollectFiles(long filePath) + throws IOException, InterruptedException { + fileCollector.scanAndCollectFiles(filePath); + } + + @Override + public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException { + blockMoveTaskHandler.submitMoveTask(blkMovingInfo); + } + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + LOG.info("Movement attempted blocks: {}", + Arrays.asList(moveAttemptFinishedBlks)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java index 27d9e7d6f84..ea3b96f9ba2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -35,16 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode; */ @InterfaceAudience.Private public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser - implements FileCollector { + implements FileCollector { private int maxQueueLimitToScan; - private final SPSService service; + private final SPSService service; private int remainingCapacity = 0; - private List> currentBatch; + private List currentBatch; public IntraSPSNameNodeFileIdCollector(FSDirectory dir, - SPSService service) { + SPSService service) { super(dir); this.service = service; this.maxQueueLimitToScan = service.getConf().getInt( @@ -64,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser return false; } if (inode.isFile() && inode.asFile().numBlocks() != 0) { - currentBatch.add(new ItemInfo( + currentBatch.add(new ItemInfo( ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); remainingCapacity--; } @@ -120,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } @Override - public void scanAndCollectFiles(final Long startINodeId) + public void scanAndCollectFiles(final long startINodeId) throws IOException, InterruptedException { FSDirectory fsd = getFSDirectory(); INode startInode = fsd.getInode(startINodeId); @@ -131,7 +131,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } if (startInode.isFile()) { currentBatch - .add(new ItemInfo(startInode.getId(), startInode.getId())); + .add(new ItemInfo(startInode.getId(), startInode.getId())); } else { readLock(); // NOTE: this lock will not be held for full directory scanning. It is diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java index bd8ab92ad0c..949e3fcdc26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java @@ -21,28 +21,26 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * ItemInfo is a file info object for which need to satisfy the policy. For - * internal satisfier service, it uses inode id which is Long datatype. For the - * external satisfier service, it uses the full string representation of the - * path. + * ItemInfo is a file info object for which need to satisfy the policy. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ItemInfo { - private T startPath; - private T file; +public class ItemInfo { + private long startPathId; + private long fileId; private int retryCount; - public ItemInfo(T startPath, T file) { - this.startPath = startPath; - this.file = file; + public ItemInfo(long startPathId, long fileId) { + this.startPathId = startPathId; + this.fileId = fileId; // set 0 when item is getting added first time in queue. this.retryCount = 0; } - public ItemInfo(final T startPath, final T file, final int retryCount) { - this.startPath = startPath; - this.file = file; + public ItemInfo(final long startPathId, final long fileId, + final int retryCount) { + this.startPathId = startPathId; + this.fileId = fileId; this.retryCount = retryCount; } @@ -50,22 +48,22 @@ public class ItemInfo { * Returns the start path of the current file. This indicates that SPS * was invoked on this path. */ - public T getStartPath() { - return startPath; + public long getStartPath() { + return startPathId; } /** * Returns the file for which needs to satisfy the policy. */ - public T getFile() { - return file; + public long getFile() { + return fileId; } /** * Returns true if the tracking path is a directory, false otherwise. */ public boolean isDir() { - return !startPath.equals(file); + return !(startPathId == fileId); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index 5032377b5f2..86634d8f7c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -29,15 +29,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; /** * An interface for SPSService, which exposes life cycle and processing APIs. - * - * @param - * is identifier of inode or full path name of inode. Internal sps will - * use the file inodeId for the block movement. External sps will use - * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface SPSService { +public interface SPSService { /** * Initializes the helper services. @@ -45,16 +40,8 @@ public interface SPSService { * @param ctxt * - context is an helper service to provide communication channel * between NN and SPS - * @param fileCollector - * - a helper service for scanning the files under a given directory - * id - * @param handler - * - a helper service for moving the blocks - * @param blkMovementListener - * - listener to know about block movement attempt completion */ - void init(Context ctxt, FileCollector fileCollector, - BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener); + void init(Context ctxt); /** * Starts the SPS service. Make sure to initialize the helper services before @@ -94,19 +81,19 @@ public interface SPSService { * @param itemInfo * file info object for which need to satisfy the policy */ - void addFileToProcess(ItemInfo itemInfo, boolean scanCompleted); + void addFileToProcess(ItemInfo itemInfo, boolean scanCompleted); /** * Adds all the Item information(file etc) to processing queue. * - * @param startPath - * - directory/file, on which SPS was called. + * @param startPathId + * - directoryId/fileId, on which SPS was called. * @param itemInfoList * - list of item infos * @param scanCompleted * - whether the scanning of directory fully done with itemInfoList */ - void addAllFilesToProcess(T startPath, List> itemInfoList, + void addAllFilesToProcess(long startPathId, List itemInfoList, boolean scanCompleted); /** @@ -117,7 +104,7 @@ public interface SPSService { /** * Clear inodeId present in the processing queue. */ - void clearQueue(T spsPath); + void clearQueue(long spsPath); /** * @return the configuration. @@ -128,9 +115,9 @@ public interface SPSService { * Marks the scanning of directory if finished. * * @param spsPath - * - satisfier path + * - satisfier path id */ - void markScanCompletedForPath(T spsPath); + void markScanCompletedForPath(long spsPath); /** * Given node is reporting that it received a certain movement attempt diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index cbd6001d61e..4af6c8ffb23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -78,20 +78,19 @@ import com.google.common.base.Preconditions; * physical block movements. */ @InterfaceAudience.Private -public class StoragePolicySatisfier implements SPSService, Runnable { +public class StoragePolicySatisfier implements SPSService, Runnable { public static final Logger LOG = LoggerFactory.getLogger(StoragePolicySatisfier.class); private Daemon storagePolicySatisfierThread; - private BlockStorageMovementNeeded storageMovementNeeded; - private BlockStorageMovementAttemptedItems storageMovementsMonitor; + private BlockStorageMovementNeeded storageMovementNeeded; + private BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; private int spsWorkMultiplier; private long blockCount = 0L; private int blockMovementMaxRetry; - private Context ctxt; - private BlockMoveTaskHandler blockMoveTaskHandler; + private Context ctxt; private final Configuration conf; - private DatanodeCacheManager dnCacheMgr; + private DatanodeCacheManager dnCacheMgr; public StoragePolicySatisfier(Configuration conf) { this.conf = conf; @@ -137,16 +136,11 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } } - public void init(final Context context, - final FileCollector fileIDCollector, - final BlockMoveTaskHandler blockMovementTaskHandler, - final BlockMovementListener blockMovementListener) { + public void init(final Context context) { this.ctxt = context; - this.storageMovementNeeded = new BlockStorageMovementNeeded(context, - fileIDCollector); - this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( - this, storageMovementNeeded, blockMovementListener); - this.blockMoveTaskHandler = blockMovementTaskHandler; + this.storageMovementNeeded = new BlockStorageMovementNeeded(context); + this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( + this, storageMovementNeeded, context); this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.blockMovementMaxRetry = getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, @@ -191,7 +185,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { storagePolicySatisfierThread.start(); this.storageMovementsMonitor.start(); this.storageMovementNeeded.activate(); - dnCacheMgr = new DatanodeCacheManager(conf); + dnCacheMgr = new DatanodeCacheManager(conf); } @Override @@ -259,7 +253,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { continue; } try { - ItemInfo itemInfo = null; + ItemInfo itemInfo = null; boolean retryItem = false; if (!ctxt.isInSafeMode()) { itemInfo = storageMovementNeeded.get(); @@ -271,7 +265,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { storageMovementNeeded.removeItemTrackInfo(itemInfo, false); continue; } - T trackId = itemInfo.getFile(); + long trackId = itemInfo.getFile(); BlocksMovingAnalysis status = null; BlockStoragePolicy existingStoragePolicy; // TODO: presently, context internally acquire the lock @@ -353,7 +347,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { blockCount = 0L; } if (retryItem) { - itemInfo.increRetryCount(); + // itemInfo.increRetryCount(); this.storageMovementNeeded.add(itemInfo); } } catch (IOException e) { @@ -469,7 +463,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen try { - blockMoveTaskHandler.submitMoveTask(blkMovingInfo); + ctxt.submitMoveTask(blkMovingInfo); LOG.debug("BlockMovingInfo: {}", blkMovingInfo); StorageTypeNodePair nodeStorage = new StorageTypeNodePair( blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget()); @@ -1092,7 +1086,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @VisibleForTesting - public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { + public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { return storageMovementsMonitor; } @@ -1109,7 +1103,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { /** * Clear queues for given track id. */ - public void clearQueue(T trackId) { + public void clearQueue(long trackId) { storageMovementNeeded.clearQueue(trackId); } @@ -1118,7 +1112,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * attempted or reported time stamp. This is used by * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. */ - final static class AttemptedItemInfo extends ItemInfo { + final static class AttemptedItemInfo extends ItemInfo { private long lastAttemptedOrReportedTime; private final Set blocks; @@ -1136,7 +1130,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * @param retryCount * file retry count */ - AttemptedItemInfo(T rootId, T trackId, + AttemptedItemInfo(long rootId, long trackId, long lastAttemptedOrReportedTime, Set blocks, int retryCount) { super(rootId, trackId, retryCount); @@ -1179,7 +1173,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @Override - public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) { + public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo, scanCompleted); if (LOG.isDebugEnabled()) { LOG.debug("Added track info for inode {} to block " @@ -1188,7 +1182,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @Override - public void addAllFilesToProcess(T startPath, List> itemInfoList, + public void addAllFilesToProcess(long startPath, List itemInfoList, boolean scanCompleted) { getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted); } @@ -1204,12 +1198,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @VisibleForTesting - public BlockStorageMovementNeeded getStorageMovementQueue() { + public BlockStorageMovementNeeded getStorageMovementQueue() { return storageMovementNeeded; } @Override - public void markScanCompletedForPath(T inodeId) { + public void markScanCompletedForPath(long inodeId) { getStorageMovementQueue().markScanCompletedForDir(inodeId); } @@ -1278,15 +1272,4 @@ public class StoragePolicySatisfier implements SPSService, Runnable { "It should be a positive, non-zero integer value."); return spsWorkMultiplier; } - - /** - * Sets external listener for testing. - * - * @param blkMovementListener - * block movement listener callback object - */ - @VisibleForTesting - void setBlockMovementListener(BlockMovementListener blkMovementListener) { - storageMovementsMonitor.setBlockMovementListener(blkMovementListener); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java index 5ec0372c712..0507d6b64b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory; public class StoragePolicySatisfyManager { private static final Logger LOG = LoggerFactory .getLogger(StoragePolicySatisfyManager.class); - private final StoragePolicySatisfier spsService; + private final StoragePolicySatisfier spsService; private final boolean storagePolicyEnabled; private volatile StoragePolicySatisfierMode mode; private final Queue pathsToBeTraveresed; @@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager { pathsToBeTraveresed = new LinkedList(); // instantiate SPS service by just keeps config reference and not starting // any supporting threads. - spsService = new StoragePolicySatisfier(conf); + spsService = new StoragePolicySatisfier(conf); this.namesystem = namesystem; this.blkMgr = blkMgr; } @@ -121,10 +121,7 @@ public class StoragePolicySatisfyManager { } // starts internal daemon service inside namenode spsService.init( - new IntraSPSNameNodeContext(namesystem, blkMgr, spsService), - new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(), - spsService), - new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null); + new IntraSPSNameNodeContext(namesystem, blkMgr, spsService)); spsService.start(false, mode); break; case EXTERNAL: @@ -221,13 +218,8 @@ public class StoragePolicySatisfyManager { mode); return; } - spsService.init( - new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService), - new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(), - spsService), - new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr, - this.namesystem), - null); + spsService.init(new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, + spsService)); spsService.start(true, newMode); break; case EXTERNAL: @@ -309,7 +301,7 @@ public class StoragePolicySatisfyManager { /** * @return internal SPS service instance. */ - public SPSService getInternalSPSService() { + public SPSService getInternalSPSService() { return this.spsService; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 5ff6ffdfef3..f80477b7f50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -209,6 +209,6 @@ public interface NamenodeProtocol { * by External SPS. */ @AtMostOnce - String getNextSPSPath() throws IOException; + Long getNextSPSPath() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index f5225d21dff..3ea02947106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -76,11 +76,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { private final SaslDataTransferClient saslClient; private final BlockStorageMovementTracker blkMovementTracker; private Daemon movementTrackerThread; - private final SPSService service; + private final SPSService service; private final BlockDispatcher blkDispatcher; public ExternalSPSBlockMoveTaskHandler(Configuration conf, - NameNodeConnector nnc, SPSService spsService) { + NameNodeConnector nnc, SPSService spsService) { int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java index 1cd46640586..189bc2b39f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.sps; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -27,6 +28,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -34,10 +37,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; @@ -49,17 +56,24 @@ import org.slf4j.LoggerFactory; * SPS from Namenode state. */ @InterfaceAudience.Private -public class ExternalSPSContext implements Context { - public static final Logger LOG = - LoggerFactory.getLogger(ExternalSPSContext.class); - private SPSService service; - private NameNodeConnector nnc = null; - private BlockStoragePolicySuite createDefaultSuite = +public class ExternalSPSContext implements Context { + public static final Logger LOG = LoggerFactory + .getLogger(ExternalSPSContext.class); + private final SPSService service; + private final NameNodeConnector nnc; + private final BlockStoragePolicySuite createDefaultSuite = BlockStoragePolicySuite.createDefaultSuite(); + private final FileCollector fileCollector; + private final BlockMoveTaskHandler externalHandler; + private final BlockMovementListener blkMovementListener; - public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { + public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { this.service = service; this.nnc = nnc; + this.fileCollector = new ExternalSPSFilePathCollector(service); + this.externalHandler = new ExternalSPSBlockMoveTaskHandler( + service.getConf(), nnc, service); + this.blkMovementListener = new ExternalBlockMovementListener(); } @Override @@ -119,9 +133,10 @@ public class ExternalSPSContext implements Context { } @Override - public boolean isFileExist(String filePath) { + public boolean isFileExist(long path) { + Path filePath = DFSUtilClient.makePathFromFileId(path); try { - return nnc.getDistributedFileSystem().exists(new Path(filePath)); + return nnc.getDistributedFileSystem().exists(filePath); } catch (IllegalArgumentException | IOException e) { LOG.warn("Exception while getting file is for the given path:{}", filePath, e); @@ -140,8 +155,9 @@ public class ExternalSPSContext implements Context { } @Override - public void removeSPSHint(String inodeId) throws IOException { - nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId), + public void removeSPSHint(long inodeId) throws IOException { + Path filePath = DFSUtilClient.makePathFromFileId(inodeId); + nnc.getDistributedFileSystem().removeXAttr(filePath, HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); } @@ -157,11 +173,12 @@ public class ExternalSPSContext implements Context { } @Override - public HdfsFileStatus getFileInfo(String path) throws IOException { + public HdfsFileStatus getFileInfo(long path) throws IOException { HdfsLocatedFileStatus fileInfo = null; try { + Path filePath = DFSUtilClient.makePathFromFileId(path); fileInfo = nnc.getDistributedFileSystem().getClient() - .getLocatedFileInfo(path, false); + .getLocatedFileInfo(filePath.toString(), false); } catch (FileNotFoundException e) { LOG.debug("Path:{} doesn't exists!", path, e); } @@ -175,7 +192,7 @@ public class ExternalSPSContext implements Context { } @Override - public String getNextSPSPath() { + public Long getNextSPSPath() { try { return nnc.getNNProtocolConnection().getNextSPSPath(); } catch (IOException e) { @@ -185,7 +202,7 @@ public class ExternalSPSContext implements Context { } @Override - public void removeSPSPathId(String pathId) { + public void removeSPSPathId(long pathId) { // We need not specifically implement for external. } @@ -193,4 +210,40 @@ public class ExternalSPSContext implements Context { public void removeAllSPSPathIds() { // We need not specifically implement for external. } -} + + @Override + public void scanAndCollectFiles(long path) + throws IOException, InterruptedException { + fileCollector.scanAndCollectFiles(path); + } + + @Override + public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException { + externalHandler.submitMoveTask(blkMovingInfo); + } + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + // External listener if it is plugged-in + if (blkMovementListener != null) { + blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks); + } + } + + /** + * Its an implementation of BlockMovementListener. + */ + private static class ExternalBlockMovementListener + implements BlockMovementListener { + + private List actualBlockMovements = new ArrayList<>(); + + @Override + public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { + for (Block block : moveAttemptFinishedBlks) { + actualBlockMovements.add(block); + } + LOG.info("Movement attempted blocks", actualBlockMovements); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java index 94354751224..611ff659a5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -41,14 +42,14 @@ import org.slf4j.LoggerFactory; * representation. */ @InterfaceAudience.Private -public class ExternalSPSFilePathCollector implements FileCollector { +public class ExternalSPSFilePathCollector implements FileCollector { public static final Logger LOG = LoggerFactory.getLogger(ExternalSPSFilePathCollector.class); private DistributedFileSystem dfs; - private SPSService service; + private SPSService service; private int maxQueueLimitToScan; - public ExternalSPSFilePathCollector(SPSService service) { + public ExternalSPSFilePathCollector(SPSService service) { this.service = service; this.maxQueueLimitToScan = service.getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, @@ -72,13 +73,13 @@ public class ExternalSPSFilePathCollector implements FileCollector { * Recursively scan the given path and add the file info to SPS service for * processing. */ - private long processPath(String startID, String childPath) { + private long processPath(Long startID, String childPath) { long pendingWorkCount = 0; // to be satisfied file counter for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { final DirectoryListing children; try { - children = dfs.getClient().listPaths(childPath, lastReturnedName, - false); + children = dfs.getClient().listPaths(childPath, + lastReturnedName, false); } catch (IOException e) { LOG.warn("Failed to list directory " + childPath + ". Ignore the directory and continue.", e); @@ -93,18 +94,18 @@ public class ExternalSPSFilePathCollector implements FileCollector { } for (HdfsFileStatus child : children.getPartialListing()) { - String childFullPath = child.getFullName(childPath); if (child.isFile()) { - service.addFileToProcess( - new ItemInfo(startID, childFullPath), false); + service.addFileToProcess(new ItemInfo(startID, child.getFileId()), + false); checkProcessingQueuesFree(); pendingWorkCount++; // increment to be satisfied file count } else { + String childFullPathName = child.getFullName(childPath); if (child.isDirectory()) { - if (!childFullPath.endsWith(Path.SEPARATOR)) { - childFullPath = childFullPath + Path.SEPARATOR; + if (!childFullPathName.endsWith(Path.SEPARATOR)) { + childFullPathName = childFullPathName + Path.SEPARATOR; } - pendingWorkCount += processPath(startID, childFullPath); + pendingWorkCount += processPath(startID, childFullPathName); } } } @@ -150,11 +151,12 @@ public class ExternalSPSFilePathCollector implements FileCollector { } @Override - public void scanAndCollectFiles(String path) throws IOException { + public void scanAndCollectFiles(long pathId) throws IOException { if (dfs == null) { dfs = getFS(service.getConf()); } - long pendingSatisfyItemsCount = processPath(path, path); + Path filePath = DFSUtilClient.makePathFromFileId(pathId); + long pendingSatisfyItemsCount = processPath(pathId, filePath.toString()); // Check whether the given path contains any item to be tracked // or the no to be satisfied paths. In case of empty list, add the given // inodeId to the 'pendingWorkForDirectory' with empty list so that later @@ -162,10 +164,10 @@ public class ExternalSPSFilePathCollector implements FileCollector { // this path is already satisfied the storage policy. if (pendingSatisfyItemsCount <= 0) { LOG.debug("There is no pending items to satisfy the given path " - + "inodeId:{}", path); - service.addAllFilesToProcess(path, new ArrayList<>(), true); + + "inodeId:{}", pathId); + service.addAllFilesToProcess(pathId, new ArrayList<>(), true); } else { - service.markScanCompletedForPath(path); + service.markScanCompletedForPath(pathId); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 236b887cdc8..af90f0d8ac5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -32,11 +31,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; @@ -68,8 +65,7 @@ public final class ExternalStoragePolicySatisfier { HdfsConfiguration spsConf = new HdfsConfiguration(); // login with SPS keytab secureLogin(spsConf); - StoragePolicySatisfier sps = new StoragePolicySatisfier( - spsConf); + StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf); nnc = getNameNodeConnector(spsConf); boolean spsRunning; @@ -82,12 +78,7 @@ public final class ExternalStoragePolicySatisfier { } ExternalSPSContext context = new ExternalSPSContext(sps, nnc); - ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); - ExternalSPSBlockMoveTaskHandler externalHandler = - new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps); - sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler, - blkMoveListener); + sps.init(context); sps.start(true, StoragePolicySatisfierMode.EXTERNAL); if (sps != null) { sps.join(); @@ -132,21 +123,4 @@ public final class ExternalStoragePolicySatisfier { } } } - - /** - * It is implementation of BlockMovementListener. - */ - private static class ExternalBlockMovementListener - implements BlockMovementListener { - - private List actualBlockMovements = new ArrayList<>(); - - @Override - public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { - for (Block block : moveAttemptFinishedBlks) { - actualBlockMovements.add(block); - } - LOG.info("Movement attempted blocks:{}", actualBlockMovements); - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index 2acc5a8ef12..89edfbf2ea6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -218,7 +218,7 @@ message GetNextSPSPathRequestProto { } message GetNextSPSPathResponseProto { - optional string spsPath = 1; + optional uint64 spsPath = 1; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index ed1fe926e75..f85769f3fdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -45,22 +45,22 @@ import org.mockito.Mockito; */ public class TestBlockStorageMovementAttemptedItems { - private BlockStorageMovementAttemptedItems bsmAttemptedItems; - private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles; + private BlockStorageMovementAttemptedItems bsmAttemptedItems; + private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles; private final int selfRetryTimeout = 500; @Before public void setup() throws Exception { Configuration config = new HdfsConfiguration(); - Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class); - SPSService sps = new StoragePolicySatisfier(config); + Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class); + SPSService sps = new StoragePolicySatisfier(config); Mockito.when(ctxt.isRunning()).thenReturn(true); Mockito.when(ctxt.isInSafeMode()).thenReturn(false); Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); unsatisfiedStorageMovementFiles = - new BlockStorageMovementNeeded(ctxt, null); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, - unsatisfiedStorageMovementFiles, null); + new BlockStorageMovementNeeded(ctxt); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, + unsatisfiedStorageMovementFiles, ctxt); } @After @@ -76,7 +76,7 @@ public class TestBlockStorageMovementAttemptedItems { long stopTime = monotonicNow() + (retryTimeout * 2); boolean isItemFound = false; while (monotonicNow() < (stopTime)) { - ItemInfo ele = null; + ItemInfo ele = null; while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { if (item == ele.getFile()) { isItemFound = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index b05717a0bab..ec5307be04a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -108,8 +108,6 @@ public class TestStoragePolicySatisfier { public static final long CAPACITY = 2 * 256 * 1024 * 1024; public static final String FILE = "/testMoveToSatisfyStoragePolicy"; public static final int DEFAULT_BLOCK_SIZE = 1024; - private ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); /** * Sets hdfs cluster. @@ -1282,8 +1280,8 @@ public class TestStoragePolicySatisfier { //Queue limit can control the traverse logic to wait for some free //entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = new StoragePolicySatisfier(config); - Context ctxt = new IntraSPSNameNodeContext( + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext( hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override @@ -1297,8 +1295,7 @@ public class TestStoragePolicySatisfier { } }; - FileCollector fileIDCollector = createFileIdCollector(sps, ctxt); - sps.init(ctxt, fileIDCollector, null, null); + sps.init(ctxt); sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); @@ -1314,13 +1311,6 @@ public class TestStoragePolicySatisfier { dfs.delete(new Path("/root"), true); } - public FileCollector createFileIdCollector( - StoragePolicySatisfier sps, Context ctxt) { - FileCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector( - hdfsCluster.getNamesystem().getFSDirectory(), sps); - return fileIDCollector; - } - /** * Test traverse when root parent got deleted. * 1. Delete L when traversing Q @@ -1351,8 +1341,8 @@ public class TestStoragePolicySatisfier { // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = new StoragePolicySatisfier(config); - Context ctxt = new IntraSPSNameNodeContext( + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext( hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override @@ -1365,8 +1355,7 @@ public class TestStoragePolicySatisfier { return true; } }; - FileCollector fileIDCollector = createFileIdCollector(sps, ctxt); - sps.init(ctxt, fileIDCollector, null, null); + sps.init(ctxt); sps.getStorageMovementQueue().activate(); INode rootINode = fsDir.getINode("/root"); @@ -1383,12 +1372,12 @@ public class TestStoragePolicySatisfier { } private void assertTraversal(List expectedTraverseOrder, - FSDirectory fsDir, StoragePolicySatisfier sps) + FSDirectory fsDir, StoragePolicySatisfier sps) throws InterruptedException { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); - ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } @@ -1403,7 +1392,7 @@ public class TestStoragePolicySatisfier { // Check other element traversed in order and E, M, U, R, S should not be // added in queue which we already removed from expected list for (String path : expectedTraverseOrder) { - ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } @@ -1717,17 +1706,17 @@ public class TestStoragePolicySatisfier { public void waitForAttemptedItems(long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = - (StoragePolicySatisfier) blockManager.getSPSManager() + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager.getSPSManager() .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems) (sps + ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems) (sps + return ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } @@ -1737,15 +1726,17 @@ public class TestStoragePolicySatisfier { public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { - Assert.assertNotNull("Didn't set external block move listener", - blkMoveListener); + BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager.getSPSManager() + .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - int actualCount = blkMoveListener.getActualBlockMovements().size(); + int actualCount = ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())).getAttemptedItemsCount(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", - expectedMovementFinishedBlocksCount, - actualCount); + expectedMovementFinishedBlocksCount, actualCount); return actualCount >= expectedMovementFinishedBlocksCount; } @@ -1798,29 +1789,12 @@ public class TestStoragePolicySatisfier { .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) .storageTypes(storageTypes).storageCapacities(capacities).build(); cluster.waitActive(); - - // Sets external listener for assertion. - blkMoveListener.clear(); - BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = - (StoragePolicySatisfier) blockManager - .getSPSManager().getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); return cluster; } public void restartNamenode() throws IOException { hdfsCluster.restartNameNodes(); hdfsCluster.waitActive(); - BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - StoragePolicySatisfyManager spsMgr = blockManager.getSPSManager(); - if (spsMgr != null && spsMgr.isInternalSatisfierRunning()) { - // Sets external listener for assertion. - blkMoveListener.clear(); - final StoragePolicySatisfier sps = - (StoragePolicySatisfier) spsMgr.getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); - } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java index 857bd6cf67f..8a25a5eb933 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier.ExternalBlockMovementListener; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Before; @@ -71,8 +70,6 @@ public class TestStoragePolicySatisfierWithStripedFile { private int cellSize; private int defaultStripeBlockSize; private Configuration conf; - private ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); private ErasureCodingPolicy getEcPolicy() { return StripedFileTestUtil.getDefaultECPolicy(); @@ -94,6 +91,8 @@ public class TestStoragePolicySatisfierWithStripedFile { // Reduced refresh cycle to update latest datanodes. conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS, 1000); + conf.setInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, 30); initConfWithStripe(conf, defaultStripeBlockSize); } @@ -135,14 +134,6 @@ public class TestStoragePolicySatisfierWithStripedFile { try { cluster.waitActive(); - // Sets external listener for assertion. - blkMoveListener.clear(); - BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = - (StoragePolicySatisfier) blockManager - .getSPSManager().getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); - DistributedFileSystem dfs = cluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -253,14 +244,6 @@ public class TestStoragePolicySatisfierWithStripedFile { try { cluster.waitActive(); - // Sets external listener for assertion. - blkMoveListener.clear(); - BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = - (StoragePolicySatisfier) blockManager - .getSPSManager().getInternalSPSService(); - sps.setBlockMovementListener(blkMoveListener); - DistributedFileSystem dfs = cluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -400,10 +383,11 @@ public class TestStoragePolicySatisfierWithStripedFile { fs.satisfyStoragePolicy(fooFile); DFSTestUtil.waitExpectedStorageType(fooFile.toString(), StorageType.ARCHIVE, 5, 30000, cluster.getFileSystem()); - //Start reaming datanodes + //Start remaining datanodes for (int i = numOfDatanodes - 1; i >= 5; i--) { cluster.restartDataNode(list.get(i), false); } + cluster.waitActive(); // verify storage types and locations. waitExpectedStorageType(cluster, fooFile.toString(), fileLen, StorageType.ARCHIVE, 9, 9, 60000); @@ -511,17 +495,17 @@ public class TestStoragePolicySatisfierWithStripedFile { long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = - (StoragePolicySatisfier) blockManager + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems) sps + ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems) sps + return ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } @@ -583,12 +567,15 @@ public class TestStoragePolicySatisfierWithStripedFile { private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, long expectedMoveFinishedBlks, int timeout) throws TimeoutException, InterruptedException { - Assert.assertNotNull("Didn't set external block move listener", - blkMoveListener); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager.getSPSManager() + .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - int actualCount = blkMoveListener.getActualBlockMovements().size(); + int actualCount = ((BlockStorageMovementAttemptedItems) (sps + .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMoveFinishedBlks, actualCount); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index be243cb3bf7..18acb50e78d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -88,10 +88,8 @@ public class TestExternalStoragePolicySatisfier private String principal; private MiniKdc kdc; private File baseDir; - private StoragePolicySatisfier externalSps; + private StoragePolicySatisfier externalSps; private ExternalSPSContext externalCtxt; - private ExternalBlockMovementListener blkMoveListener = - new ExternalBlockMovementListener(); @After public void destroy() throws Exception { @@ -143,16 +141,10 @@ public class TestExternalStoragePolicySatisfier nnc = getNameNodeConnector(getConf()); - externalSps = new StoragePolicySatisfier(getConf()); + externalSps = new StoragePolicySatisfier(getConf()); externalCtxt = new ExternalSPSContext(externalSps, nnc); - blkMoveListener.clear(); - ExternalSPSBlockMoveTaskHandler externalHandler = - new ExternalSPSBlockMoveTaskHandler(conf, nnc, - externalSps); - externalSps.init(externalCtxt, - new ExternalSPSFilePathCollector(externalSps), externalHandler, - blkMoveListener); + externalSps.init(externalCtxt); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); return cluster; } @@ -164,16 +156,10 @@ public class TestExternalStoragePolicySatisfier getCluster().restartNameNodes(); getCluster().waitActive(); - externalSps = new StoragePolicySatisfier<>(getConf()); + externalSps = new StoragePolicySatisfier(getConf()); externalCtxt = new ExternalSPSContext(externalSps, nnc); - blkMoveListener.clear(); - ExternalSPSBlockMoveTaskHandler externalHandler = - new ExternalSPSBlockMoveTaskHandler(getConf(), nnc, - externalSps); - externalSps.init(externalCtxt, - new ExternalSPSFilePathCollector(externalSps), externalHandler, - blkMoveListener); + externalSps.init(externalCtxt); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); } @@ -206,11 +192,11 @@ public class TestExternalStoragePolicySatisfier public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems) (externalSps + ((BlockStorageMovementAttemptedItems) (externalSps .getAttemptedItemsMonitor())).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems) (externalSps + return ((BlockStorageMovementAttemptedItems) (externalSps .getAttemptedItemsMonitor())) - .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; + .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } }, 100, timeout); } @@ -218,12 +204,11 @@ public class TestExternalStoragePolicySatisfier public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { - Assert.assertNotNull("Didn't set external block move listener", - blkMoveListener); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - int actualCount = blkMoveListener.getActualBlockMovements().size(); + int actualCount = externalSps.getAttemptedItemsMonitor() + .getAttemptedItemsCount(); LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMovementFinishedBlocksCount, actualCount); return actualCount