From 8467ec24fb74f30371d5a13e893fc56309ee9372 Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Fri, 16 Feb 2018 17:01:38 +0530 Subject: [PATCH] HDFS-13110: [SPS]: Reduce the number of APIs in NamenodeProtocol used by external satisfier. Contributed by Rakesh R. --- ...amenodeProtocolServerSideTranslatorPB.java | 46 +----- .../NamenodeProtocolTranslatorPB.java | 42 +---- .../hdfs/server/namenode/FSTreeTraverser.java | 2 +- .../server/namenode/NameNodeRpcServer.java | 32 +--- .../server/namenode/ReencryptionHandler.java | 2 +- .../BlockStorageMovementAttemptedItems.java | 42 ++--- .../sps/BlockStorageMovementNeeded.java | 119 +++++++------- .../hdfs/server/namenode/sps/Context.java | 55 +++---- ...ileIdCollector.java => FileCollector.java} | 17 +- .../namenode/sps/IntraSPSNameNodeContext.java | 39 ++--- .../sps/IntraSPSNameNodeFileIdCollector.java | 23 +-- .../hdfs/server/namenode/sps/ItemInfo.java | 39 ++--- .../hdfs/server/namenode/sps/SPSService.java | 32 ++-- .../namenode/sps/StoragePolicySatisfier.java | 129 ++++++++++----- .../sps/StoragePolicySatisfyManager.java | 6 +- .../server/protocol/NamenodeProtocol.java | 24 +-- .../sps/ExternalSPSBlockMoveTaskHandler.java | 4 +- .../hdfs/server/sps/ExternalSPSContext.java | 60 +++---- ...java => ExternalSPSFilePathCollector.java} | 48 +++--- .../sps/ExternalStoragePolicySatisfier.java | 7 +- .../src/main/proto/NamenodeProtocol.proto | 27 +--- ...estBlockStorageMovementAttemptedItems.java | 27 ++-- .../sps/TestStoragePolicySatisfier.java | 52 +++--- ...StoragePolicySatisfierWithStripedFile.java | 15 +- .../TestExternalStoragePolicySatisfier.java | 148 +++++++++++++----- 25 files changed, 515 insertions(+), 522 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/{FileIdCollector.java => FileCollector.java} (74%) rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/{ExternalSPSFileIDCollector.java => ExternalSPSFilePathCollector.java} (78%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index 25eafdf286f..ed176ccf0ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -35,16 +35,12 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; @@ -267,32 +263,21 @@ public class NamenodeProtocolServerSideTranslatorPB implements } @Override - public GetNextSPSPathIdResponseProto getNextSPSPathId( - RpcController controller, GetNextSPSPathIdRequestProto request) + public GetNextSPSPathResponseProto getNextSPSPath( + RpcController controller, GetNextSPSPathRequestProto request) throws ServiceException { try { - Long nextSPSPathId = impl.getNextSPSPathId(); - if (nextSPSPathId == null) { - return GetNextSPSPathIdResponseProto.newBuilder().build(); + String nextSPSPath = impl.getNextSPSPath(); + if (nextSPSPath == null) { + return GetNextSPSPathResponseProto.newBuilder().build(); } - return GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId) + return GetNextSPSPathResponseProto.newBuilder().setSpsPath(nextSPSPath) .build(); } catch (IOException e) { throw new ServiceException(e); } } - @Override - public GetFilePathResponseProto getFilePath(RpcController controller, - GetFilePathRequestProto request) throws ServiceException { - try { - return GetFilePathResponseProto.newBuilder() - .setSrcPath(impl.getFilePath(request.getFileId())).build(); - } catch (IOException e) { - throw new ServiceException(e); - } - } - @Override public CheckDNSpaceResponseProto checkDNSpaceForScheduling( RpcController controller, CheckDNSpaceRequestProto request) @@ -309,19 +294,4 @@ public class NamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } - - @Override - public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks( - RpcController controller, HasLowRedundancyBlocksRequestProto request) - throws ServiceException { - try { - return HasLowRedundancyBlocksResponseProto.newBuilder() - .setHasLowRedundancyBlocks( - impl.hasLowRedundancyBlocks(request.getInodeId())) - .build(); - } catch (IOException e) { - throw new ServiceException(e); - } - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 8bff499650a..d2e97a2d0c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -34,12 +34,10 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeys import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; @@ -271,24 +269,13 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, } @Override - public Long getNextSPSPathId() throws IOException { - GetNextSPSPathIdRequestProto req = - GetNextSPSPathIdRequestProto.newBuilder().build(); + public String getNextSPSPath() throws IOException { + GetNextSPSPathRequestProto req = + GetNextSPSPathRequestProto.newBuilder().build(); try { - GetNextSPSPathIdResponseProto nextSPSPathId = - rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req); - return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public String getFilePath(Long inodeId) throws IOException { - GetFilePathRequestProto req = - GetFilePathRequestProto.newBuilder().setFileId(inodeId).build(); - try { - return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath(); + GetNextSPSPathResponseProto nextSPSPath = + rpcProxy.getNextSPSPath(NULL_CONTROLLER, req); + return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -308,17 +295,4 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, throw ProtobufHelper.getRemoteException(e); } } - - @Override - public boolean hasLowRedundancyBlocks(long inodeId) throws IOException { - HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto - .newBuilder().setInodeId(inodeId).build(); - try { - return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req) - .getHasLowRedundancyBlocks(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java index a7d633fa663..2acbda4005b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java @@ -310,7 +310,7 @@ public abstract class FSTreeTraverser { * @throws IOException * @throws InterruptedException */ - protected abstract void submitCurrentBatch(long startId) + protected abstract void submitCurrentBatch(Long startId) throws IOException, InterruptedException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 97f38c71686..6fe38d6c178 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2561,20 +2561,9 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override - public String getFilePath(Long inodeId) throws IOException { + public String getNextSPSPath() throws IOException { checkNNStartup(); - String operationName = "getFilePath"; - namesystem.checkSuperuserPrivilege(operationName); - if (nn.isStandbyState()) { - throw new StandbyException("Not supported by Standby Namenode."); - } - return namesystem.getFilePath(inodeId); - } - - @Override - public Long getNextSPSPathId() throws IOException { - checkNNStartup(); - String operationName = "getNextSPSPathId"; + String operationName = "getNextSPSPath"; namesystem.checkSuperuserPrivilege(operationName); if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); @@ -2588,7 +2577,11 @@ public class NameNodeRpcServer implements NamenodeProtocols { + " inside namenode, so external SPS is not allowed to fetch" + " the path Ids"); } - return namesystem.getBlockManager().getSPSManager().getNextPathId(); + Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId(); + if (pathId == null) { + return null; + } + return namesystem.getFilePath(pathId); } @Override @@ -2603,15 +2596,4 @@ public class NameNodeRpcServer implements NamenodeProtocols { return namesystem.getBlockManager().getDatanodeManager() .verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize); } - - @Override - public boolean hasLowRedundancyBlocks(long inodeId) throws IOException { - checkNNStartup(); - String operationName = "hasLowRedundancyBlocks"; - namesystem.checkSuperuserPrivilege(operationName); - if (nn.isStandbyState()) { - throw new StandbyException("Not supported by Standby Namenode."); - } - return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java index feacd74eb6c..c8c8d683bcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -702,7 +702,7 @@ public class ReencryptionHandler implements Runnable { * @throws InterruptedException */ @Override - protected void submitCurrentBatch(final long zoneId) throws IOException, + protected void submitCurrentBatch(final Long zoneId) throws IOException, InterruptedException { if (currentBatch.isEmpty()) { return; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index ea7a09359c5..d2f0bb23c19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -45,8 +45,13 @@ import com.google.common.annotations.VisibleForTesting; * entries from tracking. If there is no DN reports about movement attempt * finished for a longer time period, then such items will retries automatically * after timeout. The default timeout would be 5 minutes. + * + * @param + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ -public class BlockStorageMovementAttemptedItems{ +public class BlockStorageMovementAttemptedItems { private static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); @@ -54,7 +59,7 @@ public class BlockStorageMovementAttemptedItems{ * A map holds the items which are already taken for blocks movements * processing and sent to DNs. */ - private final List storageMovementAttemptedItems; + private final List> storageMovementAttemptedItems; private final List movementFinishedBlocks; private volatile boolean monitorRunning = true; private Daemon timerThread = null; @@ -70,11 +75,11 @@ public class BlockStorageMovementAttemptedItems{ // a request is timed out. // private long minCheckTimeout = 1 * 60 * 1000; // minimum value - private BlockStorageMovementNeeded blockStorageMovementNeeded; - private final SPSService service; + private BlockStorageMovementNeeded blockStorageMovementNeeded; + private final SPSService service; - public BlockStorageMovementAttemptedItems(SPSService service, - BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, + public BlockStorageMovementAttemptedItems(SPSService service, + BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, BlockMovementListener blockMovementListener) { this.service = service; long recheckTimeout = this.service.getConf().getLong( @@ -100,7 +105,7 @@ public class BlockStorageMovementAttemptedItems{ * @param itemInfo * - tracking info */ - public void add(AttemptedItemInfo itemInfo) { + public void add(AttemptedItemInfo itemInfo) { synchronized (storageMovementAttemptedItems) { storageMovementAttemptedItems.add(itemInfo); } @@ -190,25 +195,24 @@ public class BlockStorageMovementAttemptedItems{ @VisibleForTesting void blocksStorageMovementUnReportedItemsCheck() { synchronized (storageMovementAttemptedItems) { - Iterator iter = storageMovementAttemptedItems + Iterator> iter = storageMovementAttemptedItems .iterator(); long now = monotonicNow(); while (iter.hasNext()) { - AttemptedItemInfo itemInfo = iter.next(); + AttemptedItemInfo itemInfo = iter.next(); if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { - Long blockCollectionID = itemInfo.getFileId(); + T file = itemInfo.getFile(); synchronized (movementFinishedBlocks) { - ItemInfo candidate = new ItemInfo(itemInfo.getStartId(), - blockCollectionID, itemInfo.getRetryCount() + 1); + ItemInfo candidate = new ItemInfo(itemInfo.getStartPath(), + file, itemInfo.getRetryCount() + 1); blockStorageMovementNeeded.add(candidate); iter.remove(); LOG.info("TrackID: {} becomes timed out and moved to needed " - + "retries queue for next iteration.", blockCollectionID); + + "retries queue for next iteration.", file); } } } - } } @@ -219,17 +223,17 @@ public class BlockStorageMovementAttemptedItems{ while (finishedBlksIter.hasNext()) { Block blk = finishedBlksIter.next(); synchronized (storageMovementAttemptedItems) { - Iterator iterator = storageMovementAttemptedItems - .iterator(); + Iterator> iterator = + storageMovementAttemptedItems.iterator(); while (iterator.hasNext()) { - AttemptedItemInfo attemptedItemInfo = iterator.next(); + AttemptedItemInfo attemptedItemInfo = iterator.next(); attemptedItemInfo.getBlocks().remove(blk); if (attemptedItemInfo.getBlocks().isEmpty()) { // TODO: try add this at front of the Queue, so that this element // gets the chance first and can be cleaned from queue quickly as // all movements already done. - blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo - .getStartId(), attemptedItemInfo.getFileId(), + blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo + .getStartPath(), attemptedItemInfo.getFile(), attemptedItemInfo.getRetryCount() + 1)); iterator.remove(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index c683a639f74..a194876c76c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -43,31 +43,36 @@ import com.google.common.annotations.VisibleForTesting; * schedule the block collection IDs for movement. It track the info of * scheduled items and remove the SPS xAttr from the file/Directory once * movement is success. + * + * @param + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ @InterfaceAudience.Private -public class BlockStorageMovementNeeded { +public class BlockStorageMovementNeeded { public static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementNeeded.class); - private final Queue storageMovementNeeded = - new LinkedList(); + private final Queue> storageMovementNeeded = + new LinkedList>(); /** - * Map of startId and number of child's. Number of child's indicate the + * Map of startPath and number of child's. Number of child's indicate the * number of files pending to satisfy the policy. */ - private final Map pendingWorkForDirectory = - new HashMap(); + private final Map pendingWorkForDirectory = + new HashMap<>(); - private final Map spsStatus = + private final Map spsStatus = new ConcurrentHashMap<>(); - private final Context ctxt; + private final Context ctxt; private Daemon pathIdCollector; - private FileIdCollector fileIDCollector; + private FileCollector fileCollector; private SPSPathIdProcessor pathIDProcessor; @@ -75,10 +80,10 @@ public class BlockStorageMovementNeeded { // NOT_AVAILABLE. private static long statusClearanceElapsedTimeMs = 300000; - public BlockStorageMovementNeeded(Context context, - FileIdCollector fileIDCollector) { + public BlockStorageMovementNeeded(Context context, + FileCollector fileCollector) { this.ctxt = context; - this.fileIDCollector = fileIDCollector; + this.fileCollector = fileCollector; pathIDProcessor = new SPSPathIdProcessor(); } @@ -89,8 +94,8 @@ public class BlockStorageMovementNeeded { * @param trackInfo * - track info for satisfy the policy */ - public synchronized void add(ItemInfo trackInfo) { - spsStatus.put(trackInfo.getStartId(), + public synchronized void add(ItemInfo trackInfo) { + spsStatus.put(trackInfo.getFile(), new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); storageMovementNeeded.add(trackInfo); @@ -100,8 +105,8 @@ public class BlockStorageMovementNeeded { * Add the itemInfo list to tracking list for which storage movement expected * if necessary. * - * @param startId - * - start id + * @param startPath + * - start path * @param itemInfoList * - List of child in the directory * @param scanCompleted @@ -109,10 +114,10 @@ public class BlockStorageMovementNeeded { * scan. */ @VisibleForTesting - public synchronized void addAll(long startId, List itemInfoList, + public synchronized void addAll(T startPath, List> itemInfoList, boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); - updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted); + updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted); } /** @@ -126,22 +131,22 @@ public class BlockStorageMovementNeeded { * elements to scan. */ @VisibleForTesting - public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { + public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { storageMovementNeeded.add(itemInfo); // This represents sps start id is file, so no need to update pending dir // stats. - if (itemInfo.getStartId() == itemInfo.getFileId()) { + if (itemInfo.getStartPath() == itemInfo.getFile()) { return; } - updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted); + updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted); } - private void updatePendingDirScanStats(long startId, int numScannedFiles, + private void updatePendingDirScanStats(T startPath, int numScannedFiles, boolean scanCompleted) { - DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath); if (pendingWork == null) { pendingWork = new DirPendingWorkInfo(); - pendingWorkForDirectory.put(startId, pendingWork); + pendingWorkForDirectory.put(startPath, pendingWork); } pendingWork.addPendingWorkCount(numScannedFiles); if (scanCompleted) { @@ -150,12 +155,12 @@ public class BlockStorageMovementNeeded { } /** - * Gets the block collection id for which storage movements check necessary + * Gets the satisfier files for which block storage movements check necessary * and make the movement if required. * - * @return block collection ID + * @return satisfier files */ - public synchronized ItemInfo get() { + public synchronized ItemInfo get() { return storageMovementNeeded.poll(); } @@ -176,12 +181,12 @@ public class BlockStorageMovementNeeded { * Decrease the pending child count for directory once one file blocks moved * successfully. Remove the SPS xAttr if pending child count is zero. */ - public synchronized void removeItemTrackInfo(ItemInfo trackInfo, + public synchronized void removeItemTrackInfo(ItemInfo trackInfo, boolean isSuccess) throws IOException { if (trackInfo.isDir()) { // If track is part of some start inode then reduce the pending // directory work count. - long startId = trackInfo.getStartId(); + T startId = trackInfo.getStartPath(); if (!ctxt.isFileExist(startId)) { // directory deleted just remove it. this.pendingWorkForDirectory.remove(startId); @@ -202,17 +207,17 @@ public class BlockStorageMovementNeeded { } else { // Remove xAttr if trackID doesn't exist in // storageMovementAttemptedItems or file policy satisfied. - ctxt.removeSPSHint(trackInfo.getFileId()); - updateStatus(trackInfo.getStartId(), isSuccess); + ctxt.removeSPSHint(trackInfo.getFile()); + updateStatus(trackInfo.getFile(), isSuccess); } } - public synchronized void clearQueue(long trackId) { + public synchronized void clearQueue(T trackId) { ctxt.removeSPSPathId(trackId); - Iterator iterator = storageMovementNeeded.iterator(); + Iterator> iterator = storageMovementNeeded.iterator(); while (iterator.hasNext()) { - ItemInfo next = iterator.next(); - if (next.getStartId() == trackId) { + ItemInfo next = iterator.next(); + if (next.getFile() == trackId) { iterator.remove(); } } @@ -222,7 +227,7 @@ public class BlockStorageMovementNeeded { /** * Mark inode status as SUCCESS in map. */ - private void updateStatus(long startId, boolean isSuccess){ + private void updateStatus(T startId, boolean isSuccess){ StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(startId); if (spsStatusInfo == null) { @@ -244,8 +249,8 @@ public class BlockStorageMovementNeeded { */ public synchronized void clearQueuesWithNotification() { // Remove xAttr from directories - Long trackId; - while ((trackId = ctxt.getNextSPSPathId()) != null) { + T trackId; + while ((trackId = ctxt.getNextSPSPath()) != null) { try { // Remove xAttr for file ctxt.removeSPSHint(trackId); @@ -256,17 +261,17 @@ public class BlockStorageMovementNeeded { // File's directly added to storageMovementNeeded, So try to remove // xAttr for file - ItemInfo itemInfo; + ItemInfo itemInfo; while ((itemInfo = get()) != null) { try { // Remove xAttr for file if (!itemInfo.isDir()) { - ctxt.removeSPSHint(itemInfo.getFileId()); + ctxt.removeSPSHint(itemInfo.getFile()); } } catch (IOException ie) { LOG.warn( "Failed to remove SPS xattr for track id " - + itemInfo.getFileId(), ie); + + itemInfo.getFile(), ie); } } this.clearAll(); @@ -282,29 +287,29 @@ public class BlockStorageMovementNeeded { public void run() { LOG.info("Starting SPSPathIdProcessor!."); long lastStatusCleanTime = 0; - Long startINodeId = null; + T startINode = null; while (ctxt.isRunning()) { try { if (!ctxt.isInSafeMode()) { - if (startINodeId == null) { - startINodeId = ctxt.getNextSPSPathId(); + if (startINode == null) { + startINode = ctxt.getNextSPSPath(); } // else same id will be retried - if (startINodeId == null) { + if (startINode == null) { // Waiting for SPS path Thread.sleep(3000); } else { - spsStatus.put(startINodeId, + spsStatus.put(startINode, new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); - fileIDCollector.scanAndCollectFileIds(startINodeId); + fileCollector.scanAndCollectFiles(startINode); // check if directory was empty and no child added to queue DirPendingWorkInfo dirPendingWorkInfo = - pendingWorkForDirectory.get(startINodeId); + pendingWorkForDirectory.get(startINode); if (dirPendingWorkInfo != null && dirPendingWorkInfo.isDirWorkDone()) { - ctxt.removeSPSHint(startINodeId); - pendingWorkForDirectory.remove(startINodeId); - updateStatus(startINodeId, true); + ctxt.removeSPSHint(startINode); + pendingWorkForDirectory.remove(startINode); + updateStatus(startINode, true); } } //Clear the SPS status if status is in SUCCESS more than 5 min. @@ -313,7 +318,7 @@ public class BlockStorageMovementNeeded { lastStatusCleanTime = Time.monotonicNow(); cleanSPSStatus(); } - startINodeId = null; // Current inode id successfully scanned. + startINode = null; // Current inode successfully scanned. } } catch (Throwable t) { String reClass = t.getClass().getName(); @@ -334,9 +339,9 @@ public class BlockStorageMovementNeeded { } private synchronized void cleanSPSStatus() { - for (Iterator> it = - spsStatus.entrySet().iterator(); it.hasNext();) { - Entry entry = it.next(); + for (Iterator> it = spsStatus + .entrySet().iterator(); it.hasNext();) { + Entry entry = it.next(); if (entry.getValue().canRemove()) { it.remove(); } @@ -472,8 +477,8 @@ public class BlockStorageMovementNeeded { return statusClearanceElapsedTimeMs; } - public void markScanCompletedForDir(Long inodeId) { - DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId); + public void markScanCompletedForDir(T inode) { + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode); if (pendingWork != null) { pendingWork.markScanCompleted(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index ff4ad6ba41e..84a969d711b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -33,11 +33,16 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; /** - * An interface for the communication between NameNode and SPS module. + * An interface for the communication between SPS and Namenode module. + * + * @param + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface Context { +public interface Context { /** * Returns true if the SPS is running, false otherwise. @@ -72,13 +77,13 @@ public interface Context { NetworkTopology getNetworkTopology(); /** - * Returns true if the give Inode exists in the Namespace. + * Returns true if the give file exists in the Namespace. * - * @param inodeId - * - Inode ID - * @return true if Inode exists, false otherwise. + * @param filePath + * - file info + * @return true if the given file exists, false otherwise. */ - boolean isFileExist(long inodeId); + boolean isFileExist(T filePath); /** * Gets the storage policy details for the given policy ID. @@ -97,11 +102,11 @@ public interface Context { /** * Remove the hint which was added to track SPS call. * - * @param inodeId - * - Inode ID + * @param spsPath + * - user invoked satisfier path * @throws IOException */ - void removeSPSHint(long inodeId) throws IOException; + void removeSPSHint(T spsPath) throws IOException; /** * Gets the number of live datanodes in the cluster. @@ -113,11 +118,11 @@ public interface Context { /** * Get the file info for a specific file. * - * @param inodeID - * inode identifier + * @param file + * file path * @return file status metadata information */ - HdfsFileStatus getFileInfo(long inodeID) throws IOException; + HdfsFileStatus getFileInfo(T file) throws IOException; /** * Returns all the live datanodes and its storage details. @@ -127,15 +132,6 @@ public interface Context { DatanodeStorageReport[] getLiveDatanodeStorageReport() throws IOException; - /** - * Returns true if the given inode file has low redundancy blocks. - * - * @param inodeID - * inode identifier - * @return true if block collection has low redundancy blocks - */ - boolean hasLowRedundancyBlocks(long inodeID); - /** * Checks whether the given datanode has sufficient space to occupy the given * blockSize data. @@ -153,26 +149,17 @@ public interface Context { long blockSize); /** - * @return next SPS path id to process. + * @return next SPS path info to process. */ - Long getNextSPSPathId(); + T getNextSPSPath(); /** * Removes the SPS path id. */ - void removeSPSPathId(long pathId); + void removeSPSPathId(T pathId); /** * Removes all SPS path ids. */ void removeAllSPSPathIds(); - - /** - * Gets the file path for a given inode id. - * - * @param inodeId - * - path inode id. - */ - String getFilePath(Long inodeId); - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java similarity index 74% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java index 7cf77f06a79..dceb5fa9dec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java @@ -24,20 +24,25 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * An interface for scanning the directory recursively and collect file ids + * An interface for scanning the directory recursively and collect files * under the given directory. + * + * @param + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface FileIdCollector { +public interface FileCollector { /** - * Scans the given inode directory and collects the file ids under that + * This method can be used to scan and collects the files under that * directory and adds to the given BlockStorageMovementNeeded. * - * @param inodeID - * - The directory ID + * @param filePath + * - file path */ - void scanAndCollectFileIds(Long inodeId) + void scanAndCollectFiles(T filePath) throws IOException, InterruptedException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index 495d1c4a334..f6b6d9530dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -47,17 +47,17 @@ import org.slf4j.LoggerFactory; * movements to satisfy the storage policy. */ @InterfaceAudience.Private -public class IntraSPSNameNodeContext implements Context { +public class IntraSPSNameNodeContext implements Context { private static final Logger LOG = LoggerFactory .getLogger(IntraSPSNameNodeContext.class); private final Namesystem namesystem; private final BlockManager blockManager; - private SPSService service; + private SPSService service; public IntraSPSNameNodeContext(Namesystem namesystem, - BlockManager blockManager, SPSService service) { + BlockManager blockManager, SPSService service) { this.namesystem = namesystem; this.blockManager = blockManager; this.service = service; @@ -68,20 +68,18 @@ public class IntraSPSNameNodeContext implements Context { return blockManager.getDatanodeManager().getNumLiveDataNodes(); } + /** + * @return object containing information regarding the file or null if file + * not found. + */ @Override - public HdfsFileStatus getFileInfo(long inodeID) throws IOException { + public HdfsFileStatus getFileInfo(Long inodeID) throws IOException { String filePath = namesystem.getFilePath(inodeID); if (StringUtils.isBlank(filePath)) { LOG.debug("File with inodeID:{} doesn't exists!", inodeID); return null; } - HdfsFileStatus fileInfo = null; - try { - fileInfo = namesystem.getFileInfo(filePath, true, true); - } catch (IOException e) { - LOG.debug("File path:{} doesn't exists!", filePath); - } - return fileInfo; + return namesystem.getFileInfo(filePath, true, true); } @Override @@ -97,17 +95,12 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public boolean hasLowRedundancyBlocks(long inodeId) { - return blockManager.hasLowRedundancyBlocks(inodeId); - } - - @Override - public boolean isFileExist(long inodeId) { + public boolean isFileExist(Long inodeId) { return namesystem.getFSDirectory().getInode(inodeId) != null; } @Override - public void removeSPSHint(long inodeId) throws IOException { + public void removeSPSHint(Long inodeId) throws IOException { this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY); } @@ -177,12 +170,12 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public Long getNextSPSPathId() { + public Long getNextSPSPath() { return blockManager.getSPSManager().getNextPathId(); } @Override - public void removeSPSPathId(long trackId) { + public void removeSPSPathId(Long trackId) { blockManager.getSPSManager().removePathId(trackId); } @@ -190,10 +183,4 @@ public class IntraSPSNameNodeContext implements Context { public void removeAllSPSPathIds() { blockManager.getSPSManager().removeAllPathIds(); } - - @Override - public String getFilePath(Long inodeId) { - return namesystem.getFilePath(inodeId); - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java index 7a44dd93eaf..27d9e7d6f84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -35,15 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode; */ @InterfaceAudience.Private public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser - implements FileIdCollector { + implements FileCollector { private int maxQueueLimitToScan; - private final SPSService service; + private final SPSService service; private int remainingCapacity = 0; - private List currentBatch; + private List> currentBatch; - public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) { + public IntraSPSNameNodeFileIdCollector(FSDirectory dir, + SPSService service) { super(dir); this.service = service; this.maxQueueLimitToScan = service.getConf().getInt( @@ -63,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser return false; } if (inode.isFile() && inode.asFile().numBlocks() != 0) { - currentBatch.add(new ItemInfo( + currentBatch.add(new ItemInfo( ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); remainingCapacity--; } @@ -83,10 +84,10 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } @Override - protected void submitCurrentBatch(long startId) + protected void submitCurrentBatch(Long startId) throws IOException, InterruptedException { // Add current child's to queue - service.addAllFileIdsToProcess(startId, + service.addAllFilesToProcess(startId, currentBatch, false); currentBatch.clear(); } @@ -119,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } @Override - public void scanAndCollectFileIds(final Long startINodeId) + public void scanAndCollectFiles(final Long startINodeId) throws IOException, InterruptedException { FSDirectory fsd = getFSDirectory(); INode startInode = fsd.getInode(startINodeId); @@ -129,9 +130,9 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser throttle(); } if (startInode.isFile()) { - currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId())); + currentBatch + .add(new ItemInfo(startInode.getId(), startInode.getId())); } else { - readLock(); // NOTE: this lock will not be held for full directory scanning. It is // basically a sliced locking. Once it collects a batch size( at max the @@ -148,7 +149,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } } // Mark startInode traverse is done, this is last-batch - service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true); + service.addAllFilesToProcess(startInode.getId(), currentBatch, true); currentBatch.clear(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java index 47c64cc9500..bd8ab92ad0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java @@ -21,48 +21,51 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * ItemInfo is a file info object for which need to satisfy the policy. + * ItemInfo is a file info object for which need to satisfy the policy. For + * internal satisfier service, it uses inode id which is Long datatype. For the + * external satisfier service, it uses the full string representation of the + * path. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ItemInfo { - private long startId; - private long fileId; +public class ItemInfo { + private T startPath; + private T file; private int retryCount; - public ItemInfo(long startId, long fileId) { - this.startId = startId; - this.fileId = fileId; + public ItemInfo(T startPath, T file) { + this.startPath = startPath; + this.file = file; // set 0 when item is getting added first time in queue. this.retryCount = 0; } - public ItemInfo(final long startId, final long fileId, final int retryCount) { - this.startId = startId; - this.fileId = fileId; + public ItemInfo(final T startPath, final T file, final int retryCount) { + this.startPath = startPath; + this.file = file; this.retryCount = retryCount; } /** - * Return the start inode id of the current track Id. This indicates that SPS - * was invoked on this inode id. + * Returns the start path of the current file. This indicates that SPS + * was invoked on this path. */ - public long getStartId() { - return startId; + public T getStartPath() { + return startPath; } /** - * Return the File inode Id for which needs to satisfy the policy. + * Returns the file for which needs to satisfy the policy. */ - public long getFileId() { - return fileId; + public T getFile() { + return file; } /** * Returns true if the tracking path is a directory, false otherwise. */ public boolean isDir() { - return (startId != fileId); + return !startPath.equals(file); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index da6e3659698..71d8fd124b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -27,10 +27,15 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; /** * An interface for SPSService, which exposes life cycle and processing APIs. + * + * @param + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface SPSService { +public interface SPSService { /** * Initializes the helper services. @@ -38,7 +43,7 @@ public interface SPSService { * @param ctxt * - context is an helper service to provide communication channel * between NN and SPS - * @param fileIDCollector + * @param fileCollector * - a helper service for scanning the files under a given directory * id * @param handler @@ -46,7 +51,7 @@ public interface SPSService { * @param blkMovementListener * - listener to know about block movement attempt completion */ - void init(Context ctxt, FileIdCollector fileIDCollector, + void init(Context ctxt, FileCollector fileCollector, BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener); /** @@ -82,23 +87,24 @@ public interface SPSService { boolean isRunning(); /** - * Adds the Item information(file id etc) to processing queue. + * Adds the Item information(file etc) to processing queue. * * @param itemInfo + * file info object for which need to satisfy the policy */ - void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted); + void addFileToProcess(ItemInfo itemInfo, boolean scanCompleted); /** - * Adds all the Item information(file id etc) to processing queue. + * Adds all the Item information(file etc) to processing queue. * - * @param startId - * - directory/file id, on which SPS was called. + * @param startPath + * - directory/file, on which SPS was called. * @param itemInfoList * - list of item infos * @param scanCompleted * - whether the scanning of directory fully done with itemInfoList */ - void addAllFileIdsToProcess(long startId, List itemInfoList, + void addAllFilesToProcess(T startPath, List> itemInfoList, boolean scanCompleted); /** @@ -109,7 +115,7 @@ public interface SPSService { /** * Clear inodeId present in the processing queue. */ - void clearQueue(long inodeId); + void clearQueue(T spsPath); /** * @return the configuration. @@ -119,10 +125,10 @@ public interface SPSService { /** * Marks the scanning of directory if finished. * - * @param inodeId - * - directory inode id. + * @param spsPath + * - satisfier path */ - void markScanCompletedForPath(Long inodeId); + void markScanCompletedForPath(T spsPath); /** * Notify the details of storage movement attempt finished blocks. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 6b449aa9973..08a26e143ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -66,7 +66,7 @@ import com.google.common.base.Preconditions; * storage policy type in Namespace, but physical block storage movement will * not happen until user runs "Mover Tool" explicitly for such files. The * StoragePolicySatisfier Daemon thread implemented for addressing the case - * where users may want to physically move the blocks by a dedidated daemon (can + * where users may want to physically move the blocks by a dedicated daemon (can * run inside Namenode or stand alone) instead of running mover tool explicitly. * Just calling client API to satisfyStoragePolicy on a file/dir will * automatically trigger to move its physical storage locations as expected in @@ -77,19 +77,19 @@ import com.google.common.base.Preconditions; * physical block movements. */ @InterfaceAudience.Private -public class StoragePolicySatisfier implements SPSService, Runnable { +public class StoragePolicySatisfier implements SPSService, Runnable { public static final Logger LOG = LoggerFactory.getLogger(StoragePolicySatisfier.class); private Daemon storagePolicySatisfierThread; - private BlockStorageMovementNeeded storageMovementNeeded; - private BlockStorageMovementAttemptedItems storageMovementsMonitor; + private BlockStorageMovementNeeded storageMovementNeeded; + private BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; private volatile StoragePolicySatisfierMode spsMode = StoragePolicySatisfierMode.NONE; private int spsWorkMultiplier; private long blockCount = 0L; private int blockMovementMaxRetry; - private Context ctxt; + private Context ctxt; private BlockMoveTaskHandler blockMoveTaskHandler; private final Configuration conf; @@ -135,15 +135,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } } - public void init(final Context context, final FileIdCollector fileIDCollector, + public void init(final Context context, + final FileCollector fileIDCollector, final BlockMoveTaskHandler blockMovementTaskHandler, final BlockMovementListener blockMovementListener) { this.ctxt = context; - this.storageMovementNeeded = - new BlockStorageMovementNeeded(context, fileIDCollector); - this.storageMovementsMonitor = - new BlockStorageMovementAttemptedItems(this, - storageMovementNeeded, blockMovementListener); + this.storageMovementNeeded = new BlockStorageMovementNeeded(context, + fileIDCollector); + this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( + this, storageMovementNeeded, blockMovementListener); this.blockMoveTaskHandler = blockMovementTaskHandler; this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.blockMovementMaxRetry = getConf().getInt( @@ -257,24 +257,24 @@ public class StoragePolicySatisfier implements SPSService, Runnable { continue; } try { + ItemInfo itemInfo = null; + boolean retryItem = false; if (!ctxt.isInSafeMode()) { - ItemInfo itemInfo = storageMovementNeeded.get(); + itemInfo = storageMovementNeeded.get(); if (itemInfo != null) { if(itemInfo.getRetryCount() >= blockMovementMaxRetry){ LOG.info("Failed to satisfy the policy after " + blockMovementMaxRetry + " retries. Removing inode " - + itemInfo.getFileId() + " from the queue"); + + itemInfo.getFile() + " from the queue"); storageMovementNeeded.removeItemTrackInfo(itemInfo, false); continue; } - long trackId = itemInfo.getFileId(); + T trackId = itemInfo.getFile(); BlocksMovingAnalysis status = null; DatanodeStorageReport[] liveDnReports; BlockStoragePolicy existingStoragePolicy; // TODO: presently, context internally acquire the lock // and returns the result. Need to discuss to move the lock outside? - boolean hasLowRedundancyBlocks = ctxt - .hasLowRedundancyBlocks(trackId); HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId); // Check path existence. if (fileStatus == null || fileStatus.isDir()) { @@ -289,7 +289,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus; status = analyseBlocksStorageMovementsAndAssignToDN(file, - hasLowRedundancyBlocks, existingStoragePolicy, liveDnReports); + existingStoragePolicy, liveDnReports); switch (status.status) { // Just add to monitor, so it will be retried after timeout case ANALYSIS_SKIPPED_FOR_RETRY: @@ -302,8 +302,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + "movement attempt finished report", status.status, fileStatus.getPath()); } - this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo - .getStartId(), itemInfo.getFileId(), monotonicNow(), + this.storageMovementsMonitor.add(new AttemptedItemInfo( + itemInfo.getStartPath(), itemInfo.getFile(), monotonicNow(), status.assignedBlocks, itemInfo.getRetryCount())); break; case NO_BLOCKS_TARGETS_PAIRED: @@ -312,8 +312,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + " retry queue as none of the blocks found its eligible" + " targets.", trackId, fileStatus.getPath()); } - itemInfo.increRetryCount(); - this.storageMovementNeeded.add(itemInfo); + retryItem = true; break; case FEW_LOW_REDUNDANCY_BLOCKS: if (LOG.isDebugEnabled()) { @@ -321,8 +320,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + "retry queue as some of the blocks are low redundant.", trackId, fileStatus.getPath()); } - itemInfo.increRetryCount(); - this.storageMovementNeeded.add(itemInfo); + retryItem = true; break; case BLOCKS_FAILED_TO_MOVE: if (LOG.isDebugEnabled()) { @@ -330,7 +328,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + "retry queue as some of the blocks movement failed.", trackId, fileStatus.getPath()); } - this.storageMovementNeeded.add(itemInfo); + retryItem = true; break; // Just clean Xattrs case BLOCKS_TARGET_PAIRING_SKIPPED: @@ -354,6 +352,10 @@ public class StoragePolicySatisfier implements SPSService, Runnable { Thread.sleep(3000); blockCount = 0L; } + if (retryItem) { + itemInfo.increRetryCount(); + this.storageMovementNeeded.add(itemInfo); + } } catch (IOException e) { LOG.error("Exception during StoragePolicySatisfier execution - " + "will continue next cycle", e); @@ -377,7 +379,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( - HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks, + HdfsLocatedFileStatus fileInfo, BlockStoragePolicy existingStoragePolicy, DatanodeStorageReport[] liveDns) { BlocksMovingAnalysis.Status status = @@ -403,9 +405,17 @@ public class StoragePolicySatisfier implements SPSService, Runnable { new ArrayList<>()); } List blockMovingInfos = new ArrayList(); - + boolean hasLowRedundancyBlocks = false; + int replication = fileInfo.getReplication(); for (int i = 0; i < blocks.size(); i++) { LocatedBlock blockInfo = blocks.get(i); + + // Block is considered as low redundancy when the block locations array + // length is less than expected replication factor. If any of the block is + // low redundant, then hasLowRedundancyBlocks will be marked as true. + hasLowRedundancyBlocks |= isLowRedundancyBlock(blockInfo, replication, + ecPolicy); + List expectedStorageTypes; if (blockInfo.isStriped()) { if (ErasureCodingPolicyManager @@ -446,13 +456,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable { // policy. status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED; } - } else if (hasLowRedundancyBlocks - && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) { - // Check if the previous block was successfully paired. - status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; } } + // If there is no block paired and few blocks are low redundant, so marking + // the status as FEW_LOW_REDUNDANCY_BLOCKS. + if (hasLowRedundancyBlocks + && status == BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED) { + status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; + } List assignedBlockIds = new ArrayList(); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen @@ -470,6 +482,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable { return new BlocksMovingAnalysis(status, assignedBlockIds); } + /** + * The given block is considered as low redundancy when the block locations + * length is less than expected replication factor. For EC blocks, redundancy + * is the summation of data + parity blocks. + * + * @param blockInfo + * block + * @param replication + * replication factor of the given file block + * @param ecPolicy + * erasure coding policy of the given file block + * @return true if the given block is low redundant. + */ + private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication, + ErasureCodingPolicy ecPolicy) { + boolean hasLowRedundancyBlock = false; + if (blockInfo.isStriped()) { + // For EC blocks, redundancy is the summation of data + parity blocks. + replication = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(); + } + // block is considered as low redundancy when the block locations length is + // less than expected replication factor. + hasLowRedundancyBlock = blockInfo.getLocations().length < replication ? true + : false; + return hasLowRedundancyBlock; + } + /** * Compute the list of block moving information corresponding to the given * blockId. This will check that each block location of the given block is @@ -863,7 +902,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @VisibleForTesting - BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { + public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { return storageMovementsMonitor; } @@ -880,7 +919,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { /** * Clear queues for given track id. */ - public void clearQueue(long trackId) { + public void clearQueue(T trackId) { storageMovementNeeded.clearQueue(trackId); } @@ -889,7 +928,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * attempted or reported time stamp. This is used by * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. */ - final static class AttemptedItemInfo extends ItemInfo { + final static class AttemptedItemInfo extends ItemInfo { private long lastAttemptedOrReportedTime; private final List blocks; @@ -903,7 +942,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * @param lastAttemptedOrReportedTime * last attempted or reported time */ - AttemptedItemInfo(long rootId, long trackId, + AttemptedItemInfo(T rootId, T trackId, long lastAttemptedOrReportedTime, List blocks, int retryCount) { super(rootId, trackId, retryCount); @@ -932,24 +971,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } + /** + * Returns sps invoked path status. This method is used by internal satisfy + * storage policy service. + * + * @param path + * sps path + * @return storage policy satisfy path status + * @throws IOException + */ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( String path) throws IOException { return storageMovementNeeded.getStatus(ctxt.getFileID(path)); } @Override - public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) { + public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo, scanCompleted); if (LOG.isDebugEnabled()) { LOG.debug("Added track info for inode {} to block " - + "storageMovementNeeded queue", trackInfo.getFileId()); + + "storageMovementNeeded queue", trackInfo.getFile()); } } @Override - public void addAllFileIdsToProcess(long startId, List itemInfoList, + public void addAllFilesToProcess(T startPath, List> itemInfoList, boolean scanCompleted) { - getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted); + getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted); } @Override @@ -963,12 +1011,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @VisibleForTesting - public BlockStorageMovementNeeded getStorageMovementQueue() { + public BlockStorageMovementNeeded getStorageMovementQueue() { return storageMovementNeeded; } @Override - public void markScanCompletedForPath(Long inodeId) { + public void markScanCompletedForPath(T inodeId) { getStorageMovementQueue().markScanCompletedForDir(inodeId); } @@ -976,7 +1024,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * Join main SPS thread. */ public void join() throws InterruptedException { - //TODO Add join here on SPS rpc server also storagePolicySatisfierThread.join(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java index 5bdf6aeaded..5ec0372c712 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory; public class StoragePolicySatisfyManager { private static final Logger LOG = LoggerFactory .getLogger(StoragePolicySatisfyManager.class); - private final StoragePolicySatisfier spsService; + private final StoragePolicySatisfier spsService; private final boolean storagePolicyEnabled; private volatile StoragePolicySatisfierMode mode; private final Queue pathsToBeTraveresed; @@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager { pathsToBeTraveresed = new LinkedList(); // instantiate SPS service by just keeps config reference and not starting // any supporting threads. - spsService = new StoragePolicySatisfier(conf); + spsService = new StoragePolicySatisfier(conf); this.namesystem = namesystem; this.blkMgr = blkMgr; } @@ -309,7 +309,7 @@ public class StoragePolicySatisfyManager { /** * @return internal SPS service instance. */ - public SPSService getInternalSPSService() { + public SPSService getInternalSPSService() { return this.spsService; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 9f5caddd2f2..615e297cdb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -206,21 +206,11 @@ public interface NamenodeProtocol { boolean isRollingUpgrade() throws IOException; /** - * Gets the file path for the given file id. This API used by External SPS. - * - * @param inodeId - * - file inode id. - * @return path - */ - @Idempotent - String getFilePath(Long inodeId) throws IOException; - - /** - * @return Gets the next available sps path id, otherwise null. This API used + * @return Gets the next available sps path, otherwise null. This API used * by External SPS. */ @AtMostOnce - Long getNextSPSPathId() throws IOException; + String getNextSPSPath() throws IOException; /** * Verifies whether the given Datanode has the enough estimated size with @@ -236,15 +226,5 @@ public interface NamenodeProtocol { @Idempotent boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, long estimatedSize) throws IOException; - - /** - * Check if any low redundancy blocks for given file id. This API used by - * External SPS. - * - * @param inodeID - * - inode id. - */ - @Idempotent - boolean hasLowRedundancyBlocks(long inodeID) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 4a762649d66..7580ba9f15c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -81,11 +81,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { private final SaslDataTransferClient saslClient; private final BlockStorageMovementTracker blkMovementTracker; private Daemon movementTrackerThread; - private final SPSService service; + private final SPSService service; private final BlockDispatcher blkDispatcher; public ExternalSPSBlockMoveTaskHandler(Configuration conf, - NameNodeConnector nnc, SPSService spsService) { + NameNodeConnector nnc, SPSService spsService) { int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java index c309209dd28..5d0aee65fd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.sps; +import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; @@ -30,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -46,15 +48,15 @@ import org.slf4j.LoggerFactory; * SPS from Namenode state. */ @InterfaceAudience.Private -public class ExternalSPSContext implements Context { +public class ExternalSPSContext implements Context { public static final Logger LOG = LoggerFactory.getLogger(ExternalSPSContext.class); - private SPSService service; + private SPSService service; private NameNodeConnector nnc = null; private BlockStoragePolicySuite createDefaultSuite = BlockStoragePolicySuite.createDefaultSuite(); - public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { + public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { this.service = service; this.nnc = nnc; } @@ -110,14 +112,12 @@ public class ExternalSPSContext implements Context { } @Override - public boolean isFileExist(long inodeId) { - String filePath = null; + public boolean isFileExist(String filePath) { try { - filePath = getFilePath(inodeId); return nnc.getDistributedFileSystem().exists(new Path(filePath)); } catch (IllegalArgumentException | IOException e) { - LOG.warn("Exception while getting file is for the given path:{} " - + "and fileId:{}", filePath, inodeId, e); + LOG.warn("Exception while getting file is for the given path:{}", + filePath, e); } return false; } @@ -133,8 +133,8 @@ public class ExternalSPSContext implements Context { } @Override - public void removeSPSHint(long inodeId) throws IOException { - nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)), + public void removeSPSHint(String inodeId) throws IOException { + nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId), HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); } @@ -150,9 +150,15 @@ public class ExternalSPSContext implements Context { } @Override - public HdfsFileStatus getFileInfo(long inodeID) throws IOException { - return nnc.getDistributedFileSystem().getClient() - .getLocatedFileInfo(getFilePath(inodeID), false); + public HdfsFileStatus getFileInfo(String path) throws IOException { + HdfsLocatedFileStatus fileInfo = null; + try { + fileInfo = nnc.getDistributedFileSystem().getClient() + .getLocatedFileInfo(path, false); + } catch (FileNotFoundException e) { + LOG.debug("Path:{} doesn't exists!", path, e); + } + return fileInfo; } @Override @@ -161,17 +167,6 @@ public class ExternalSPSContext implements Context { return nnc.getLiveDatanodeStorageReport(); } - @Override - public boolean hasLowRedundancyBlocks(long inodeID) { - try { - return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID); - } catch (IOException e) { - LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.", - inodeID, e); - return false; - } - } - @Override public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, long estimatedSize) { @@ -190,9 +185,9 @@ public class ExternalSPSContext implements Context { } @Override - public Long getNextSPSPathId() { + public String getNextSPSPath() { try { - return nnc.getNNProtocolConnection().getNextSPSPathId(); + return nnc.getNNProtocolConnection().getNextSPSPath(); } catch (IOException e) { LOG.warn("Exception while getting next sps path id from Namenode.", e); return null; @@ -200,7 +195,7 @@ public class ExternalSPSContext implements Context { } @Override - public void removeSPSPathId(long pathId) { + public void removeSPSPathId(String pathId) { // We need not specifically implement for external. } @@ -208,15 +203,4 @@ public class ExternalSPSContext implements Context { public void removeAllSPSPathIds() { // We need not specifically implement for external. } - - @Override - public String getFilePath(Long inodeId) { - try { - return nnc.getNNProtocolConnection().getFilePath(inodeId); - } catch (IOException e) { - LOG.warn("Exception while getting file path id:{} from Namenode.", - inodeId, e); - return null; - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java similarity index 78% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java index ff277ba6646..94354751224 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java @@ -28,8 +28,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.server.namenode.sps.Context; -import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; +import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector; import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.slf4j.Logger; @@ -38,19 +37,18 @@ import org.slf4j.LoggerFactory; /** * This class is to scan the paths recursively. If file is directory, then it * will scan for files recursively. If the file is non directory, then it will - * just submit the same file to process. + * just submit the same file to process. This will use file string path + * representation. */ @InterfaceAudience.Private -public class ExternalSPSFileIDCollector implements FileIdCollector { +public class ExternalSPSFilePathCollector implements FileCollector { public static final Logger LOG = - LoggerFactory.getLogger(ExternalSPSFileIDCollector.class); - private Context cxt; + LoggerFactory.getLogger(ExternalSPSFilePathCollector.class); private DistributedFileSystem dfs; - private SPSService service; + private SPSService service; private int maxQueueLimitToScan; - public ExternalSPSFileIDCollector(Context cxt, SPSService service) { - this.cxt = cxt; + public ExternalSPSFilePathCollector(SPSService service) { this.service = service; this.maxQueueLimitToScan = service.getConf().getInt( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, @@ -74,38 +72,39 @@ public class ExternalSPSFileIDCollector implements FileIdCollector { * Recursively scan the given path and add the file info to SPS service for * processing. */ - private long processPath(long startID, String fullPath) { + private long processPath(String startID, String childPath) { long pendingWorkCount = 0; // to be satisfied file counter for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { final DirectoryListing children; try { - children = dfs.getClient().listPaths(fullPath, lastReturnedName, false); + children = dfs.getClient().listPaths(childPath, lastReturnedName, + false); } catch (IOException e) { - LOG.warn("Failed to list directory " + fullPath + LOG.warn("Failed to list directory " + childPath + ". Ignore the directory and continue.", e); return pendingWorkCount; } if (children == null) { if (LOG.isDebugEnabled()) { - LOG.debug("The scanning start dir/sub dir " + fullPath + LOG.debug("The scanning start dir/sub dir " + childPath + " does not have childrens."); } return pendingWorkCount; } for (HdfsFileStatus child : children.getPartialListing()) { + String childFullPath = child.getFullName(childPath); if (child.isFile()) { - service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()), - false); + service.addFileToProcess( + new ItemInfo(startID, childFullPath), false); checkProcessingQueuesFree(); pendingWorkCount++; // increment to be satisfied file count } else { - String fullPathStr = child.getFullName(fullPath); if (child.isDirectory()) { - if (!fullPathStr.endsWith(Path.SEPARATOR)) { - fullPathStr = fullPathStr + Path.SEPARATOR; + if (!childFullPath.endsWith(Path.SEPARATOR)) { + childFullPath = childFullPath + Path.SEPARATOR; } - pendingWorkCount += processPath(startID, fullPathStr); + pendingWorkCount += processPath(startID, childFullPath); } } } @@ -151,12 +150,11 @@ public class ExternalSPSFileIDCollector implements FileIdCollector { } @Override - public void scanAndCollectFileIds(Long inodeId) throws IOException { + public void scanAndCollectFiles(String path) throws IOException { if (dfs == null) { dfs = getFS(service.getConf()); } - long pendingSatisfyItemsCount = processPath(inodeId, - cxt.getFilePath(inodeId)); + long pendingSatisfyItemsCount = processPath(path, path); // Check whether the given path contains any item to be tracked // or the no to be satisfied paths. In case of empty list, add the given // inodeId to the 'pendingWorkForDirectory' with empty list so that later @@ -164,10 +162,10 @@ public class ExternalSPSFileIDCollector implements FileIdCollector { // this path is already satisfied the storage policy. if (pendingSatisfyItemsCount <= 0) { LOG.debug("There is no pending items to satisfy the given path " - + "inodeId:{}", inodeId); - service.addAllFileIdsToProcess(inodeId, new ArrayList<>(), true); + + "inodeId:{}", path); + service.addAllFilesToProcess(path, new ArrayList<>(), true); } else { - service.markScanCompletedForPath(inodeId); + service.markScanCompletedForPath(path); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java index 33448db6ff2..6fc35ea7e9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java @@ -68,7 +68,8 @@ public final class ExternalStoragePolicySatisfier { HdfsConfiguration spsConf = new HdfsConfiguration(); // login with SPS keytab secureLogin(spsConf); - StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf); + StoragePolicySatisfier sps = new StoragePolicySatisfier( + spsConf); nnc = getNameNodeConnector(spsConf); boolean spsRunning; @@ -86,8 +87,8 @@ public final class ExternalStoragePolicySatisfier { ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps); externalHandler.init(); - sps.init(context, new ExternalSPSFileIDCollector(context, sps), - externalHandler, blkMoveListener); + sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler, + blkMoveListener); sps.start(true, StoragePolicySatisfierMode.EXTERNAL); if (sps != null) { sps.join(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index b0e900d4504..b137f2faab9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -214,11 +214,11 @@ message GetFilePathResponseProto { required string srcPath = 1; } -message GetNextSPSPathIdRequestProto { +message GetNextSPSPathRequestProto { } -message GetNextSPSPathIdResponseProto { - optional uint64 fileId = 1; +message GetNextSPSPathResponseProto { + optional string spsPath = 1; } message CheckDNSpaceRequestProto { @@ -322,26 +322,15 @@ service NamenodeProtocolService { returns (IsRollingUpgradeResponseProto); /** - * Return the corresponding file path for give file id + * Return the sps path from namenode */ - rpc getFilePath(GetFilePathRequestProto) - returns (GetFilePathResponseProto); + rpc getNextSPSPath(GetNextSPSPathRequestProto) + returns (GetNextSPSPathResponseProto); /** - * Return the sps path id from namenode - */ - rpc getNextSPSPathId(GetNextSPSPathIdRequestProto) - returns (GetNextSPSPathIdResponseProto); - - /** - * Return the sps path id from namenode + * Verifies whether the given Datanode has the enough estimated size with + * given storage type for scheduling the block movement. */ rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto) returns (CheckDNSpaceResponseProto); - - /** - * check whether given file id has low redundancy blocks. - */ - rpc hasLowRedundancyBlocks(HasLowRedundancyBlocksRequestProto) - returns (HasLowRedundancyBlocksResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java index 40973397d5c..29af885a0b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java @@ -40,22 +40,21 @@ import org.mockito.Mockito; */ public class TestBlockStorageMovementAttemptedItems { - private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; - private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; + private BlockStorageMovementAttemptedItems bsmAttemptedItems; + private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles; private final int selfRetryTimeout = 500; @Before public void setup() throws Exception { Configuration config = new HdfsConfiguration(); - Context ctxt = Mockito.mock(Context.class); - SPSService sps = Mockito.mock(StoragePolicySatisfier.class); - Mockito.when(sps.getConf()).thenReturn(config); + Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class); + SPSService sps = new StoragePolicySatisfier(config); Mockito.when(ctxt.isRunning()).thenReturn(true); Mockito.when(ctxt.isInSafeMode()).thenReturn(false); Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); unsatisfiedStorageMovementFiles = - new BlockStorageMovementNeeded(ctxt, null); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, + new BlockStorageMovementNeeded(ctxt, null); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps, unsatisfiedStorageMovementFiles, null); } @@ -72,9 +71,9 @@ public class TestBlockStorageMovementAttemptedItems { long stopTime = monotonicNow() + (retryTimeout * 2); boolean isItemFound = false; while (monotonicNow() < (stopTime)) { - ItemInfo ele = null; + ItemInfo ele = null; while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { - if (item == ele.getFileId()) { + if (item == ele.getFile()) { isItemFound = true; break; } @@ -97,7 +96,7 @@ public class TestBlockStorageMovementAttemptedItems { Long item = new Long(1234); List blocks = new ArrayList(); blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); + bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); Block[] blockArray = new Block[blocks.size()]; blocks.toArray(blockArray); bsmAttemptedItems.notifyMovementTriedBlocks(blockArray); @@ -114,7 +113,7 @@ public class TestBlockStorageMovementAttemptedItems { Long item = new Long(1234); List blocks = new ArrayList<>(); blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); + bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); assertEquals("Shouldn't receive result", 0, bsmAttemptedItems.getMovementFinishedBlocksCount()); assertEquals("Item doesn't exist in the attempted list", 1, @@ -135,7 +134,7 @@ public class TestBlockStorageMovementAttemptedItems { blocks.add(new Block(5678L)); Long trackID = 0L; bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); + .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); @@ -160,7 +159,7 @@ public class TestBlockStorageMovementAttemptedItems { List blocks = new ArrayList<>(); blocks.add(new Block(item)); bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); + .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); @@ -188,7 +187,7 @@ public class TestBlockStorageMovementAttemptedItems { List blocks = new ArrayList<>(); blocks.add(new Block(item)); bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); + .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); Block[] blksMovementReport = new Block[1]; blksMovementReport[0] = new Block(item); bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 6f7fe896902..2a3d0c86ebc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -105,7 +105,7 @@ public class TestStoragePolicySatisfier { public static final int NUM_OF_DATANODES = 3; public static final int STORAGES_PER_DATANODE = 2; public static final long CAPACITY = 2 * 256 * 1024 * 1024; - public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying"; + public static final String FILE = "/testMoveToSatisfyStoragePolicy"; public static final int DEFAULT_BLOCK_SIZE = 1024; /** @@ -1269,8 +1269,9 @@ public class TestStoragePolicySatisfier { //Queue limit can control the traverse logic to wait for some free //entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = new StoragePolicySatisfier(config); - Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext( + hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override public boolean isInSafeMode() { @@ -1283,7 +1284,7 @@ public class TestStoragePolicySatisfier { } }; - FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); + FileCollector fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null, null); sps.getStorageMovementQueue().activate(); @@ -1300,9 +1301,9 @@ public class TestStoragePolicySatisfier { dfs.delete(new Path("/root"), true); } - public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, - Context ctxt) { - FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector( + public FileCollector createFileIdCollector( + StoragePolicySatisfier sps, Context ctxt) { + FileCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector( hdfsCluster.getNamesystem().getFSDirectory(), sps); return fileIDCollector; } @@ -1337,8 +1338,9 @@ public class TestStoragePolicySatisfier { // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = new StoragePolicySatisfier(config); - Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), + StoragePolicySatisfier sps = new StoragePolicySatisfier(config); + Context ctxt = new IntraSPSNameNodeContext( + hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @Override public boolean isInSafeMode() { @@ -1350,7 +1352,7 @@ public class TestStoragePolicySatisfier { return true; } }; - FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); + FileCollector fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null, null); sps.getStorageMovementQueue().activate(); @@ -1368,16 +1370,16 @@ public class TestStoragePolicySatisfier { } private void assertTraversal(List expectedTraverseOrder, - FSDirectory fsDir, StoragePolicySatisfier sps) + FSDirectory fsDir, StoragePolicySatisfier sps) throws InterruptedException { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); - ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } - long trackId = itemInfo.getFileId(); + Long trackId = itemInfo.getFile(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1388,11 +1390,11 @@ public class TestStoragePolicySatisfier { // Check other element traversed in order and E, M, U, R, S should not be // added in queue which we already removed from expected list for (String path : expectedTraverseOrder) { - ItemInfo itemInfo = sps.getStorageMovementQueue().get(); + ItemInfo itemInfo = sps.getStorageMovementQueue().get(); if (itemInfo == null) { continue; } - long trackId = itemInfo.getFileId(); + Long trackId = itemInfo.getFile(); INode inode = fsDir.getInode(trackId); assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); @@ -1696,39 +1698,41 @@ public class TestStoragePolicySatisfier { return file1; } - private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, + public void waitForAttemptedItems(long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager - .getSPSManager().getInternalSPSService(); + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager.getSPSManager() + .getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems) (sps + ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems) (sps + return ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } }, 100, timeout); } - private void waitForBlocksMovementAttemptReport( + public void waitForBlocksMovementAttemptReport( long expectedMovementFinishedBlocksCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", expectedMovementFinishedBlocksCount, - ((BlockStorageMovementAttemptedItems) (sps + ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); - return ((BlockStorageMovementAttemptedItems) (sps + return ((BlockStorageMovementAttemptedItems) (sps .getAttemptedItemsMonitor())) .getMovementFinishedBlocksCount() >= expectedMovementFinishedBlocksCount; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java index ef123001e16..a39fb92a571 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java @@ -495,16 +495,17 @@ public class TestStoragePolicySatisfierWithStripedFile { long expectedBlkMovAttemptedCount, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager .getSPSManager().getInternalSPSService(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", expectedBlkMovAttemptedCount, - ((BlockStorageMovementAttemptedItems) sps + ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()).getAttemptedItemsCount()); - return ((BlockStorageMovementAttemptedItems) sps + return ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()) .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; } @@ -567,7 +568,8 @@ public class TestStoragePolicySatisfierWithStripedFile { long expectedMoveFinishedBlks, int timeout) throws TimeoutException, InterruptedException { BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = (StoragePolicySatisfier) blockManager + final StoragePolicySatisfier sps = + (StoragePolicySatisfier) blockManager .getSPSManager().getInternalSPSService(); Assert.assertNotNull("Failed to get SPS object reference!", sps); @@ -575,9 +577,10 @@ public class TestStoragePolicySatisfierWithStripedFile { @Override public Boolean get() { LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", - expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps + expectedMoveFinishedBlks, + ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount()); - return ((BlockStorageMovementAttemptedItems) sps + return ((BlockStorageMovementAttemptedItems) sps .getAttemptedItemsMonitor()) .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 0546f39bcee..28e172ab785 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -43,23 +43,23 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; -import org.apache.hadoop.hdfs.server.namenode.sps.Context; -import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; import org.apache.hadoop.http.HttpConfig; @@ -74,6 +74,8 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import com.google.common.base.Supplier; + /** * Tests the external sps service plugins. */ @@ -88,6 +90,8 @@ public class TestExternalStoragePolicySatisfier private String principal; private MiniKdc kdc; private File baseDir; + private StoragePolicySatisfier externalSps; + private ExternalSPSContext externalCtxt; @After public void destroy() throws Exception { @@ -97,6 +101,14 @@ public class TestExternalStoragePolicySatisfier } } + @Override + public void shutdownCluster() { + if (externalSps != null) { + externalSps.stopGracefully(); + } + super.shutdownCluster(); + } + @Override public void setUp() { super.setUp(); @@ -131,60 +143,44 @@ public class TestExternalStoragePolicySatisfier nnc = getNameNodeConnector(getConf()); - BlockManager blkMgr = cluster.getNameNode().getNamesystem() - .getBlockManager(); - SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); - - ExternalSPSContext context = new ExternalSPSContext(spsService, + externalSps = new StoragePolicySatisfier(getConf()); + externalCtxt = new ExternalSPSContext(externalSps, getNameNodeConnector(conf)); ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(conf, nnc, - blkMgr.getSPSManager().getInternalSPSService()); + externalSps); externalHandler.init(); - spsService.init(context, - new ExternalSPSFileIDCollector(context, - blkMgr.getSPSManager().getInternalSPSService()), - externalHandler, blkMoveListener); - spsService.start(true, StoragePolicySatisfierMode.EXTERNAL); + externalSps.init(externalCtxt, + new ExternalSPSFilePathCollector(externalSps), externalHandler, + blkMoveListener); + externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); return cluster; } public void restartNamenode() throws IOException{ - BlockManager blkMgr = getCluster().getNameNode().getNamesystem() - .getBlockManager(); - SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); + if (externalSps != null) { + externalSps.stopGracefully(); + } getCluster().restartNameNodes(); getCluster().waitActive(); - blkMgr = getCluster().getNameNode().getNamesystem() - .getBlockManager(); - spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); + externalSps = new StoragePolicySatisfier<>(getConf()); - ExternalSPSContext context = new ExternalSPSContext(spsService, + externalCtxt = new ExternalSPSContext(externalSps, getNameNodeConnector(getConf())); ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = new ExternalSPSBlockMoveTaskHandler(getConf(), nnc, - blkMgr.getSPSManager().getInternalSPSService()); + externalSps); externalHandler.init(); - spsService.init(context, - new ExternalSPSFileIDCollector(context, - blkMgr.getSPSManager().getInternalSPSService()), - externalHandler, blkMoveListener); - spsService.start(true, StoragePolicySatisfierMode.EXTERNAL); - } - - @Override - public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, - Context ctxt) { - return new ExternalSPSFileIDCollector(ctxt, sps); + externalSps.init(externalCtxt, + new ExternalSPSFilePathCollector(externalSps), externalHandler, + blkMoveListener); + externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); } private class ExternalBlockMovementListener implements BlockMovementListener { @@ -204,7 +200,7 @@ public class TestExternalStoragePolicySatisfier throws IOException { final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); Assert.assertEquals(1, namenodes.size()); - final Path externalSPSPathId = new Path("/system/tmp.id"); + final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH; NameNodeConnector.checkOtherInstanceRunning(false); while (true) { try { @@ -222,6 +218,40 @@ public class TestExternalStoragePolicySatisfier } } + public void waitForAttemptedItems(long expectedBlkMovAttemptedCount, + int timeout) throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", + expectedBlkMovAttemptedCount, + ((BlockStorageMovementAttemptedItems) (externalSps + .getAttemptedItemsMonitor())).getAttemptedItemsCount()); + return ((BlockStorageMovementAttemptedItems) (externalSps + .getAttemptedItemsMonitor())) + .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; + } + }, 100, timeout); + } + + public void waitForBlocksMovementAttemptReport( + long expectedMovementFinishedBlocksCount, int timeout) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", + expectedMovementFinishedBlocksCount, + ((BlockStorageMovementAttemptedItems) (externalSps + .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount()); + return ((BlockStorageMovementAttemptedItems) (externalSps + .getAttemptedItemsMonitor())) + .getMovementFinishedBlocksCount() + >= expectedMovementFinishedBlocksCount; + } + }, 100, timeout); + } + private void initSecureConf(Configuration conf) throws Exception { String username = "externalSPS"; baseDir = GenericTestUtils @@ -321,10 +351,6 @@ public class TestExternalStoragePolicySatisfier List files = new ArrayList<>(); files.add(FILE); DistributedFileSystem fs = getFS(); - BlockManager blkMgr = getCluster().getNameNode().getNamesystem() - .getBlockManager(); - SPSService spsService = blkMgr.getSPSManager().getInternalSPSService(); - spsService.stopGracefully(); // stops SPS // Creates 4 more files. Send all of them for satisfying the storage // policy together. @@ -366,6 +392,28 @@ public class TestExternalStoragePolicySatisfier } } + /** + * Tests to verify that SPS should be able to start when the Mover ID file + * is not being hold by a Mover. This can be the case when Mover exits + * ungracefully without deleting the ID file from HDFS. + */ + @Test(timeout = 300000) + public void testWhenMoverExitsWithoutDeleteMoverIDFile() + throws IOException { + try { + createCluster(); + // Simulate the case by creating MOVER_ID file + DFSTestUtil.createFile(getCluster().getFileSystem(), + HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0); + restartNamenode(); + boolean running = externalCtxt.isRunning(); + Assert.assertTrue("SPS should be running as " + + "no Mover really running", running); + } finally { + shutdownCluster(); + } + } + /** * This test need not run as external scan is not a batch based scanning right * now. @@ -389,4 +437,20 @@ public class TestExternalStoragePolicySatisfier @Ignore("Status is not supported for external SPS. So, ignoring it.") public void testMaxRetryForFailedBlock() throws Exception { } + + /** + * This test is specific to internal SPS. So, ignoring it. + */ + @Ignore("This test is specific to internal SPS. So, ignoring it.") + @Override + public void testTraverseWhenParentDeleted() throws Exception { + } + + /** + * This test is specific to internal SPS. So, ignoring it. + */ + @Ignore("This test is specific to internal SPS. So, ignoring it.") + @Override + public void testTraverseWhenRootParentDeleted() throws Exception { + } }