HDFS-13381 : [SPS]: Use DFSUtilClient#makePathFromFileId() to prepare satisfier file path. Contributed by Rakesh R.

This commit is contained in:
Uma Maheswara Rao G 2018-07-02 17:22:00 -07:00 committed by Uma Maheswara Rao Gangumalla
parent 2acc50b826
commit 66e8f9b315
27 changed files with 346 additions and 414 deletions

View File

@ -265,7 +265,7 @@ public class NamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetNextSPSPathRequestProto request) RpcController controller, GetNextSPSPathRequestProto request)
throws ServiceException { throws ServiceException {
try { try {
String nextSPSPath = impl.getNextSPSPath(); Long nextSPSPath = impl.getNextSPSPath();
if (nextSPSPath == null) { if (nextSPSPath == null) {
return GetNextSPSPathResponseProto.newBuilder().build(); return GetNextSPSPathResponseProto.newBuilder().build();
} }

View File

@ -267,7 +267,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
} }
@Override @Override
public String getNextSPSPath() throws IOException { public Long getNextSPSPath() throws IOException {
GetNextSPSPathRequestProto req = GetNextSPSPathRequestProto req =
GetNextSPSPathRequestProto.newBuilder().build(); GetNextSPSPathRequestProto.newBuilder().build();
try { try {

View File

@ -3897,7 +3897,7 @@ public class BlockManager implements BlockStatsMXBean {
private void notifyStorageMovementAttemptFinishedBlk( private void notifyStorageMovementAttemptFinishedBlk(
DatanodeStorageInfo storageInfo, Block block) { DatanodeStorageInfo storageInfo, Block block) {
if (getSPSManager() != null) { if (getSPSManager() != null) {
SPSService<Long> sps = getSPSManager().getInternalSPSService(); SPSService sps = getSPSManager().getInternalSPSService();
if (sps.isRunning()) { if (sps.isRunning()) {
sps.notifyStorageMovementAttemptFinishedBlk( sps.notifyStorageMovementAttemptFinishedBlk(
storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(), storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),

View File

@ -3202,17 +3202,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return stat; return stat;
} }
@Override
public String getFilePath(Long inodeId) {
readLock();
try {
INode inode = getFSDirectory().getInode(inodeId);
return inode == null ? null : inode.getFullPathName();
} finally {
readUnlock();
}
}
/** /**
* Returns true if the file is closed * Returns true if the file is closed
*/ */

View File

@ -2569,7 +2569,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override @Override
public String getNextSPSPath() throws IOException { public Long getNextSPSPath() throws IOException {
checkNNStartup(); checkNNStartup();
String operationName = "getNextSPSPath"; String operationName = "getNextSPSPath";
namesystem.checkSuperuserPrivilege(operationName); namesystem.checkSuperuserPrivilege(operationName);
@ -2589,10 +2589,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("SPS service mode is " + spsMode + ", so " throw new IOException("SPS service mode is " + spsMode + ", so "
+ "external SPS service is not allowed to fetch the path Ids"); + "external SPS service is not allowed to fetch the path Ids");
} }
Long pathId = spsMgr.getNextPathId(); return namesystem.getBlockManager().getSPSManager().getNextPathId();
if (pathId == null) {
return null;
}
return namesystem.getFilePath(pathId);
} }
} }

View File

@ -77,13 +77,4 @@ public interface Namesystem extends RwLock, SafeMode {
*/ */
HdfsFileStatus getFileInfo(String filePath, boolean resolveLink, HdfsFileStatus getFileInfo(String filePath, boolean resolveLink,
boolean needLocation) throws IOException; boolean needLocation) throws IOException;
/**
* Gets the file path corresponds to the given file id.
*
* @param inodeId
* file id
* @return string file path
*/
String getFilePath(Long inodeId);
} }

View File

@ -52,13 +52,8 @@ import com.google.common.annotations.VisibleForTesting;
* entries from tracking. If there is no DN reports about movement attempt * entries from tracking. If there is no DN reports about movement attempt
* finished for a longer time period, then such items will retries automatically * finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes. * after timeout. The default timeout would be 5 minutes.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
public class BlockStorageMovementAttemptedItems<T> { public class BlockStorageMovementAttemptedItems {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
@ -66,14 +61,14 @@ public class BlockStorageMovementAttemptedItems<T> {
* A map holds the items which are already taken for blocks movements * A map holds the items which are already taken for blocks movements
* processing and sent to DNs. * processing and sent to DNs.
*/ */
private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems; private final List<AttemptedItemInfo> storageMovementAttemptedItems;
private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs; private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs;
// Maintains separate Queue to keep the movement finished blocks. This Q // Maintains separate Queue to keep the movement finished blocks. This Q
// is used to update the storageMovementAttemptedItems list asynchronously. // is used to update the storageMovementAttemptedItems list asynchronously.
private final BlockingQueue<Block> movementFinishedBlocks; private final BlockingQueue<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true; private volatile boolean monitorRunning = true;
private Daemon timerThread = null; private Daemon timerThread = null;
private BlockMovementListener blkMovementListener; private final Context context;
// //
// It might take anywhere between 5 to 10 minutes before // It might take anywhere between 5 to 10 minutes before
// a request is timed out. // a request is timed out.
@ -85,12 +80,12 @@ public class BlockStorageMovementAttemptedItems<T> {
// a request is timed out. // a request is timed out.
// //
private long minCheckTimeout = 1 * 60 * 1000; // minimum value private long minCheckTimeout = 1 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded<T> blockStorageMovementNeeded; private BlockStorageMovementNeeded blockStorageMovementNeeded;
private final SPSService<T> service; private final SPSService service;
public BlockStorageMovementAttemptedItems(SPSService<T> service, public BlockStorageMovementAttemptedItems(SPSService service,
BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles, BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
BlockMovementListener blockMovementListener) { Context context) {
this.service = service; this.service = service;
long recheckTimeout = this.service.getConf().getLong( long recheckTimeout = this.service.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
@ -106,19 +101,27 @@ public class BlockStorageMovementAttemptedItems<T> {
storageMovementAttemptedItems = new ArrayList<>(); storageMovementAttemptedItems = new ArrayList<>();
scheduledBlkLocs = new HashMap<>(); scheduledBlkLocs = new HashMap<>();
movementFinishedBlocks = new LinkedBlockingQueue<>(); movementFinishedBlocks = new LinkedBlockingQueue<>();
this.blkMovementListener = blockMovementListener; this.context = context;
} }
/** /**
* Add item to block storage movement attempted items map which holds the * Add item to block storage movement attempted items map which holds the
* tracking/blockCollection id versus time stamp. * tracking/blockCollection id versus time stamp.
* *
* @param itemInfo * @param startPathId
* - tracking info * - start satisfier path identifier
* @param fileId
* - file identifier
* @param monotonicNow
* - time now
* @param assignedBlocks
* - assigned blocks for block movement
* @param retryCount
* - retry count
*/ */
public void add(T startPath, T file, long monotonicNow, public void add(long startPathId, long fileId, long monotonicNow,
Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) { Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) {
AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file, AttemptedItemInfo itemInfo = new AttemptedItemInfo(startPathId, fileId,
monotonicNow, assignedBlocks.keySet(), retryCount); monotonicNow, assignedBlocks.keySet(), retryCount);
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.add(itemInfo); storageMovementAttemptedItems.add(itemInfo);
@ -161,11 +164,9 @@ public class BlockStorageMovementAttemptedItems<T> {
boolean foundType = dn.getStorageType().equals(type); boolean foundType = dn.getStorageType().equals(type);
if (foundDn && foundType) { if (foundDn && foundType) {
blkLocs.remove(dn); blkLocs.remove(dn);
// listener if it is plugged-in Block[] mFinishedBlocks = new Block[1];
if (blkMovementListener != null) { mFinishedBlocks[0] = reportedBlock;
blkMovementListener context.notifyMovementTriedBlocks(mFinishedBlocks);
.notifyMovementTriedBlocks(new Block[] {reportedBlock});
}
// All the block locations has reported. // All the block locations has reported.
if (blkLocs.size() <= 0) { if (blkLocs.size() <= 0) {
movementFinishedBlocks.add(reportedBlock); movementFinishedBlocks.add(reportedBlock);
@ -244,15 +245,15 @@ public class BlockStorageMovementAttemptedItems<T> {
@VisibleForTesting @VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() { void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
.iterator(); .iterator();
long now = monotonicNow(); long now = monotonicNow();
while (iter.hasNext()) { while (iter.hasNext()) {
AttemptedItemInfo<T> itemInfo = iter.next(); AttemptedItemInfo itemInfo = iter.next();
if (now > itemInfo.getLastAttemptedOrReportedTime() if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) { + selfRetryTimeout) {
T file = itemInfo.getFile(); long file = itemInfo.getFile();
ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), file, ItemInfo candidate = new ItemInfo(itemInfo.getStartPath(), file,
itemInfo.getRetryCount() + 1); itemInfo.getRetryCount() + 1);
blockStorageMovementNeeded.add(candidate); blockStorageMovementNeeded.add(candidate);
iter.remove(); iter.remove();
@ -272,13 +273,13 @@ public class BlockStorageMovementAttemptedItems<T> {
// Update attempted items list // Update attempted items list
for (Block blk : finishedBlks) { for (Block blk : finishedBlks) {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
.iterator(); .iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
AttemptedItemInfo<T> attemptedItemInfo = iterator.next(); AttemptedItemInfo attemptedItemInfo = iterator.next();
attemptedItemInfo.getBlocks().remove(blk); attemptedItemInfo.getBlocks().remove(blk);
if (attemptedItemInfo.getBlocks().isEmpty()) { if (attemptedItemInfo.getBlocks().isEmpty()) {
blockStorageMovementNeeded.add(new ItemInfo<T>( blockStorageMovementNeeded.add(new ItemInfo(
attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(), attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(),
attemptedItemInfo.getRetryCount() + 1)); attemptedItemInfo.getRetryCount() + 1));
iterator.remove(); iterator.remove();
@ -309,15 +310,4 @@ public class BlockStorageMovementAttemptedItems<T> {
scheduledBlkLocs.clear(); scheduledBlkLocs.clear();
} }
} }
/**
* Sets external listener for testing.
*
* @param blkMoveListener
* block movement listener callback object
*/
@VisibleForTesting
void setBlockMovementListener(BlockMovementListener blkMoveListener) {
this.blkMovementListener = blkMoveListener;
}
} }

View File

@ -43,47 +43,38 @@ import com.google.common.annotations.VisibleForTesting;
* schedule the block collection IDs for movement. It track the info of * schedule the block collection IDs for movement. It track the info of
* scheduled items and remove the SPS xAttr from the file/Directory once * scheduled items and remove the SPS xAttr from the file/Directory once
* movement is success. * movement is success.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BlockStorageMovementNeeded<T> { public class BlockStorageMovementNeeded {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementNeeded.class); LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
private final Queue<ItemInfo<T>> storageMovementNeeded = private final Queue<ItemInfo> storageMovementNeeded =
new LinkedList<ItemInfo<T>>(); new LinkedList<ItemInfo>();
/** /**
* Map of startPath and number of child's. Number of child's indicate the * Map of startPath and number of child's. Number of child's indicate the
* number of files pending to satisfy the policy. * number of files pending to satisfy the policy.
*/ */
private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory = private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<>(); new HashMap<>();
private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus = private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final Context<T> ctxt; private final Context ctxt;
private Daemon pathIdCollector; private Daemon pathIdCollector;
private FileCollector<T> fileCollector;
private SPSPathIdProcessor pathIDProcessor; private SPSPathIdProcessor pathIDProcessor;
// Amount of time to cache the SUCCESS status of path before turning it to // Amount of time to cache the SUCCESS status of path before turning it to
// NOT_AVAILABLE. // NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000; private static long statusClearanceElapsedTimeMs = 300000;
public BlockStorageMovementNeeded(Context<T> context, public BlockStorageMovementNeeded(Context context) {
FileCollector<T> fileCollector) {
this.ctxt = context; this.ctxt = context;
this.fileCollector = fileCollector;
pathIDProcessor = new SPSPathIdProcessor(); pathIDProcessor = new SPSPathIdProcessor();
} }
@ -94,7 +85,7 @@ public class BlockStorageMovementNeeded<T> {
* @param trackInfo * @param trackInfo
* - track info for satisfy the policy * - track info for satisfy the policy
*/ */
public synchronized void add(ItemInfo<T> trackInfo) { public synchronized void add(ItemInfo trackInfo) {
spsStatus.put(trackInfo.getFile(), spsStatus.put(trackInfo.getFile(),
new StoragePolicySatisfyPathStatusInfo( new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS)); StoragePolicySatisfyPathStatus.IN_PROGRESS));
@ -114,7 +105,7 @@ public class BlockStorageMovementNeeded<T> {
* scan. * scan.
*/ */
@VisibleForTesting @VisibleForTesting
public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList, public synchronized void addAll(long startPath, List<ItemInfo> itemInfoList,
boolean scanCompleted) { boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList); storageMovementNeeded.addAll(itemInfoList);
updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted); updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
@ -131,7 +122,7 @@ public class BlockStorageMovementNeeded<T> {
* elements to scan. * elements to scan.
*/ */
@VisibleForTesting @VisibleForTesting
public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) { public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
storageMovementNeeded.add(itemInfo); storageMovementNeeded.add(itemInfo);
// This represents sps start id is file, so no need to update pending dir // This represents sps start id is file, so no need to update pending dir
// stats. // stats.
@ -141,7 +132,7 @@ public class BlockStorageMovementNeeded<T> {
updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted); updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
} }
private void updatePendingDirScanStats(T startPath, int numScannedFiles, private void updatePendingDirScanStats(long startPath, int numScannedFiles,
boolean scanCompleted) { boolean scanCompleted) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath); DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath);
if (pendingWork == null) { if (pendingWork == null) {
@ -160,7 +151,7 @@ public class BlockStorageMovementNeeded<T> {
* *
* @return satisfier files * @return satisfier files
*/ */
public synchronized ItemInfo<T> get() { public synchronized ItemInfo get() {
return storageMovementNeeded.poll(); return storageMovementNeeded.poll();
} }
@ -181,12 +172,12 @@ public class BlockStorageMovementNeeded<T> {
* Decrease the pending child count for directory once one file blocks moved * Decrease the pending child count for directory once one file blocks moved
* successfully. Remove the SPS xAttr if pending child count is zero. * successfully. Remove the SPS xAttr if pending child count is zero.
*/ */
public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo, public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
boolean isSuccess) throws IOException { boolean isSuccess) throws IOException {
if (trackInfo.isDir()) { if (trackInfo.isDir()) {
// If track is part of some start inode then reduce the pending // If track is part of some start inode then reduce the pending
// directory work count. // directory work count.
T startId = trackInfo.getStartPath(); long startId = trackInfo.getStartPath();
if (!ctxt.isFileExist(startId)) { if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it. // directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId); this.pendingWorkForDirectory.remove(startId);
@ -212,11 +203,11 @@ public class BlockStorageMovementNeeded<T> {
} }
} }
public synchronized void clearQueue(T trackId) { public synchronized void clearQueue(long trackId) {
ctxt.removeSPSPathId(trackId); ctxt.removeSPSPathId(trackId);
Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator(); Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
ItemInfo<T> next = iterator.next(); ItemInfo next = iterator.next();
if (next.getFile() == trackId) { if (next.getFile() == trackId) {
iterator.remove(); iterator.remove();
} }
@ -227,7 +218,7 @@ public class BlockStorageMovementNeeded<T> {
/** /**
* Mark inode status as SUCCESS in map. * Mark inode status as SUCCESS in map.
*/ */
private void updateStatus(T startId, boolean isSuccess){ private void updateStatus(long startId, boolean isSuccess){
StoragePolicySatisfyPathStatusInfo spsStatusInfo = StoragePolicySatisfyPathStatusInfo spsStatusInfo =
spsStatus.get(startId); spsStatus.get(startId);
if (spsStatusInfo == null) { if (spsStatusInfo == null) {
@ -249,7 +240,7 @@ public class BlockStorageMovementNeeded<T> {
*/ */
public synchronized void clearQueuesWithNotification() { public synchronized void clearQueuesWithNotification() {
// Remove xAttr from directories // Remove xAttr from directories
T trackId; Long trackId;
while ((trackId = ctxt.getNextSPSPath()) != null) { while ((trackId = ctxt.getNextSPSPath()) != null) {
try { try {
// Remove xAttr for file // Remove xAttr for file
@ -261,7 +252,7 @@ public class BlockStorageMovementNeeded<T> {
// File's directly added to storageMovementNeeded, So try to remove // File's directly added to storageMovementNeeded, So try to remove
// xAttr for file // xAttr for file
ItemInfo<T> itemInfo; ItemInfo itemInfo;
while ((itemInfo = get()) != null) { while ((itemInfo = get()) != null) {
try { try {
// Remove xAttr for file // Remove xAttr for file
@ -287,7 +278,7 @@ public class BlockStorageMovementNeeded<T> {
public void run() { public void run() {
LOG.info("Starting SPSPathIdProcessor!."); LOG.info("Starting SPSPathIdProcessor!.");
long lastStatusCleanTime = 0; long lastStatusCleanTime = 0;
T startINode = null; Long startINode = null;
while (ctxt.isRunning()) { while (ctxt.isRunning()) {
try { try {
if (!ctxt.isInSafeMode()) { if (!ctxt.isInSafeMode()) {
@ -301,7 +292,7 @@ public class BlockStorageMovementNeeded<T> {
spsStatus.put(startINode, spsStatus.put(startINode,
new StoragePolicySatisfyPathStatusInfo( new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS)); StoragePolicySatisfyPathStatus.IN_PROGRESS));
fileCollector.scanAndCollectFiles(startINode); ctxt.scanAndCollectFiles(startINode);
// check if directory was empty and no child added to queue // check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo = DirPendingWorkInfo dirPendingWorkInfo =
pendingWorkForDirectory.get(startINode); pendingWorkForDirectory.get(startINode);
@ -339,9 +330,9 @@ public class BlockStorageMovementNeeded<T> {
} }
private synchronized void cleanSPSStatus() { private synchronized void cleanSPSStatus() {
for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = spsStatus for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
.entrySet().iterator(); it.hasNext();) { spsStatus.entrySet().iterator(); it.hasNext();) {
Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next(); Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
if (entry.getValue().canRemove()) { if (entry.getValue().canRemove()) {
it.remove(); it.remove();
} }
@ -477,7 +468,7 @@ public class BlockStorageMovementNeeded<T> {
return statusClearanceElapsedTimeMs; return statusClearanceElapsedTimeMs;
} }
public void markScanCompletedForDir(T inode) { public void markScanCompletedForDir(long inode) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode); DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode);
if (pendingWork != null) { if (pendingWork != null) {
pendingWork.markScanCompleted(); pendingWork.markScanCompleted();

View File

@ -24,24 +24,21 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
/** /**
* An interface for the communication between SPS and Namenode module. * An interface for the communication between SPS and Namenode module.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface Context<T> { public interface Context {
/** /**
* Returns true if the SPS is running, false otherwise. * Returns true if the SPS is running, false otherwise.
@ -85,7 +82,7 @@ public interface Context<T> {
* - file info * - file info
* @return true if the given file exists, false otherwise. * @return true if the given file exists, false otherwise.
*/ */
boolean isFileExist(T filePath); boolean isFileExist(long filePath);
/** /**
* Gets the storage policy details for the given policy ID. * Gets the storage policy details for the given policy ID.
@ -108,7 +105,7 @@ public interface Context<T> {
* - user invoked satisfier path * - user invoked satisfier path
* @throws IOException * @throws IOException
*/ */
void removeSPSHint(T spsPath) throws IOException; void removeSPSHint(long spsPath) throws IOException;
/** /**
* Gets the number of live datanodes in the cluster. * Gets the number of live datanodes in the cluster.
@ -124,7 +121,7 @@ public interface Context<T> {
* file path * file path
* @return file status metadata information * @return file status metadata information
*/ */
HdfsFileStatus getFileInfo(T file) throws IOException; HdfsFileStatus getFileInfo(long file) throws IOException;
/** /**
* Returns all the live datanodes and its storage details. * Returns all the live datanodes and its storage details.
@ -137,15 +134,41 @@ public interface Context<T> {
/** /**
* @return next SPS path info to process. * @return next SPS path info to process.
*/ */
T getNextSPSPath(); Long getNextSPSPath();
/** /**
* Removes the SPS path id. * Removes the SPS path id.
*/ */
void removeSPSPathId(T pathId); void removeSPSPathId(long pathId);
/** /**
* Removes all SPS path ids. * Removes all SPS path ids.
*/ */
void removeAllSPSPathIds(); void removeAllSPSPathIds();
/**
* Do scan and collects the files under that directory and adds to the given
* BlockStorageMovementNeeded.
*
* @param filePath
* file path
*/
void scanAndCollectFiles(long filePath)
throws IOException, InterruptedException;
/**
* Handles the block move tasks. BlockMovingInfo must contain the required
* info to move the block, that source location, destination location and
* storage types.
*/
void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException;
/**
* This can be used to notify to the SPS about block movement attempt
* finished. Then SPS will re-check whether it needs retry or not.
*
* @param moveAttemptFinishedBlks
* list of movement attempt finished blocks
*/
void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks);
} }

View File

@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
* interval. * interval.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DatanodeCacheManager<T> { public class DatanodeCacheManager {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(DatanodeCacheManager.class); .getLogger(DatanodeCacheManager.class);
@ -78,7 +78,7 @@ public class DatanodeCacheManager<T> {
* @throws IOException * @throws IOException
*/ */
public DatanodeMap getLiveDatanodeStorageReport( public DatanodeMap getLiveDatanodeStorageReport(
Context<T> spsContext) throws IOException { Context spsContext) throws IOException {
long now = Time.monotonicNow(); long now = Time.monotonicNow();
long elapsedTimeMs = now - lastAccessedTime; long elapsedTimeMs = now - lastAccessedTime;
boolean refreshNeeded = elapsedTimeMs >= refreshIntervalMs; boolean refreshNeeded = elapsedTimeMs >= refreshIntervalMs;

View File

@ -26,23 +26,18 @@ import org.apache.hadoop.classification.InterfaceStability;
/** /**
* An interface for scanning the directory recursively and collect files * An interface for scanning the directory recursively and collect files
* under the given directory. * under the given directory.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface FileCollector<T> { public interface FileCollector {
/** /**
* This method can be used to scan and collects the files under that * This method can be used to scan and collects the files under that
* directory and adds to the given BlockStorageMovementNeeded. * directory and adds to the given BlockStorageMovementNeeded.
* *
* @param filePath * @param path
* - file path * - file path id
*/ */
void scanAndCollectFiles(T filePath) void scanAndCollectFiles(long path)
throws IOException, InterruptedException; throws IOException, InterruptedException;
} }

View File

@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -32,6 +35,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -45,20 +49,26 @@ import org.slf4j.LoggerFactory;
* movements to satisfy the storage policy. * movements to satisfy the storage policy.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class IntraSPSNameNodeContext implements Context<Long> { public class IntraSPSNameNodeContext implements Context {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class); .getLogger(IntraSPSNameNodeContext.class);
private final Namesystem namesystem; private final Namesystem namesystem;
private final BlockManager blockManager; private final BlockManager blockManager;
private SPSService<Long> service; private SPSService service;
private final FileCollector fileCollector;
private final BlockMoveTaskHandler blockMoveTaskHandler;
public IntraSPSNameNodeContext(Namesystem namesystem, public IntraSPSNameNodeContext(Namesystem namesystem,
BlockManager blockManager, SPSService<Long> service) { BlockManager blockManager, SPSService service) {
this.namesystem = namesystem; this.namesystem = namesystem;
this.blockManager = blockManager; this.blockManager = blockManager;
this.service = service; this.service = service;
fileCollector = new IntraSPSNameNodeFileIdCollector(
namesystem.getFSDirectory(), service);
blockMoveTaskHandler = new IntraSPSNameNodeBlockMoveTaskHandler(
blockManager, namesystem);
} }
@Override @Override
@ -67,17 +77,12 @@ public class IntraSPSNameNodeContext implements Context<Long> {
} }
/** /**
* @return object containing information regarding the file or null if file * @return object containing information regarding the file.
* not found.
*/ */
@Override @Override
public HdfsFileStatus getFileInfo(Long inodeID) throws IOException { public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
String filePath = namesystem.getFilePath(inodeID); Path filePath = DFSUtilClient.makePathFromFileId(inodeID);
if (StringUtils.isBlank(filePath)) { return namesystem.getFileInfo(filePath.toString(), true, true);
LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
return null;
}
return namesystem.getFileInfo(filePath, true, true);
} }
@Override @Override
@ -93,12 +98,12 @@ public class IntraSPSNameNodeContext implements Context<Long> {
} }
@Override @Override
public boolean isFileExist(Long inodeId) { public boolean isFileExist(long inodeId) {
return namesystem.getFSDirectory().getInode(inodeId) != null; return namesystem.getFSDirectory().getInode(inodeId) != null;
} }
@Override @Override
public void removeSPSHint(Long inodeId) throws IOException { public void removeSPSHint(long inodeId) throws IOException {
this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY); this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
} }
@ -156,7 +161,7 @@ public class IntraSPSNameNodeContext implements Context<Long> {
} }
@Override @Override
public void removeSPSPathId(Long trackId) { public void removeSPSPathId(long trackId) {
blockManager.getSPSManager().removePathId(trackId); blockManager.getSPSManager().removePathId(trackId);
} }
@ -164,4 +169,21 @@ public class IntraSPSNameNodeContext implements Context<Long> {
public void removeAllSPSPathIds() { public void removeAllSPSPathIds() {
blockManager.getSPSManager().removeAllPathIds(); blockManager.getSPSManager().removeAllPathIds();
} }
@Override
public void scanAndCollectFiles(long filePath)
throws IOException, InterruptedException {
fileCollector.scanAndCollectFiles(filePath);
}
@Override
public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
}
@Override
public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
LOG.info("Movement attempted blocks: {}",
Arrays.asList(moveAttemptFinishedBlks));
}
} }

View File

@ -35,16 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
implements FileCollector<Long> { implements FileCollector {
private int maxQueueLimitToScan; private int maxQueueLimitToScan;
private final SPSService <Long> service; private final SPSService service;
private int remainingCapacity = 0; private int remainingCapacity = 0;
private List<ItemInfo<Long>> currentBatch; private List<ItemInfo> currentBatch;
public IntraSPSNameNodeFileIdCollector(FSDirectory dir, public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
SPSService<Long> service) { SPSService service) {
super(dir); super(dir);
this.service = service; this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt( this.maxQueueLimitToScan = service.getConf().getInt(
@ -64,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
return false; return false;
} }
if (inode.isFile() && inode.asFile().numBlocks() != 0) { if (inode.isFile() && inode.asFile().numBlocks() != 0) {
currentBatch.add(new ItemInfo<Long>( currentBatch.add(new ItemInfo(
((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
remainingCapacity--; remainingCapacity--;
} }
@ -120,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
} }
@Override @Override
public void scanAndCollectFiles(final Long startINodeId) public void scanAndCollectFiles(final long startINodeId)
throws IOException, InterruptedException { throws IOException, InterruptedException {
FSDirectory fsd = getFSDirectory(); FSDirectory fsd = getFSDirectory();
INode startInode = fsd.getInode(startINodeId); INode startInode = fsd.getInode(startINodeId);
@ -131,7 +131,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
} }
if (startInode.isFile()) { if (startInode.isFile()) {
currentBatch currentBatch
.add(new ItemInfo<Long>(startInode.getId(), startInode.getId())); .add(new ItemInfo(startInode.getId(), startInode.getId()));
} else { } else {
readLock(); readLock();
// NOTE: this lock will not be held for full directory scanning. It is // NOTE: this lock will not be held for full directory scanning. It is

View File

@ -21,28 +21,26 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
/** /**
* ItemInfo is a file info object for which need to satisfy the policy. For * ItemInfo is a file info object for which need to satisfy the policy.
* internal satisfier service, it uses inode id which is Long datatype. For the
* external satisfier service, it uses the full string representation of the
* path.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ItemInfo<T> { public class ItemInfo {
private T startPath; private long startPathId;
private T file; private long fileId;
private int retryCount; private int retryCount;
public ItemInfo(T startPath, T file) { public ItemInfo(long startPathId, long fileId) {
this.startPath = startPath; this.startPathId = startPathId;
this.file = file; this.fileId = fileId;
// set 0 when item is getting added first time in queue. // set 0 when item is getting added first time in queue.
this.retryCount = 0; this.retryCount = 0;
} }
public ItemInfo(final T startPath, final T file, final int retryCount) { public ItemInfo(final long startPathId, final long fileId,
this.startPath = startPath; final int retryCount) {
this.file = file; this.startPathId = startPathId;
this.fileId = fileId;
this.retryCount = retryCount; this.retryCount = retryCount;
} }
@ -50,22 +48,22 @@ public class ItemInfo<T> {
* Returns the start path of the current file. This indicates that SPS * Returns the start path of the current file. This indicates that SPS
* was invoked on this path. * was invoked on this path.
*/ */
public T getStartPath() { public long getStartPath() {
return startPath; return startPathId;
} }
/** /**
* Returns the file for which needs to satisfy the policy. * Returns the file for which needs to satisfy the policy.
*/ */
public T getFile() { public long getFile() {
return file; return fileId;
} }
/** /**
* Returns true if the tracking path is a directory, false otherwise. * Returns true if the tracking path is a directory, false otherwise.
*/ */
public boolean isDir() { public boolean isDir() {
return !startPath.equals(file); return !(startPathId == fileId);
} }
/** /**

View File

@ -29,15 +29,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
/** /**
* An interface for SPSService, which exposes life cycle and processing APIs. * An interface for SPSService, which exposes life cycle and processing APIs.
*
* @param <T>
* is identifier of inode or full path name of inode. Internal sps will
* use the file inodeId for the block movement. External sps will use
* file string path representation for the block movement.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface SPSService<T> { public interface SPSService {
/** /**
* Initializes the helper services. * Initializes the helper services.
@ -45,16 +40,8 @@ public interface SPSService<T> {
* @param ctxt * @param ctxt
* - context is an helper service to provide communication channel * - context is an helper service to provide communication channel
* between NN and SPS * between NN and SPS
* @param fileCollector
* - a helper service for scanning the files under a given directory
* id
* @param handler
* - a helper service for moving the blocks
* @param blkMovementListener
* - listener to know about block movement attempt completion
*/ */
void init(Context<T> ctxt, FileCollector<T> fileCollector, void init(Context ctxt);
BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
/** /**
* Starts the SPS service. Make sure to initialize the helper services before * Starts the SPS service. Make sure to initialize the helper services before
@ -94,19 +81,19 @@ public interface SPSService<T> {
* @param itemInfo * @param itemInfo
* file info object for which need to satisfy the policy * file info object for which need to satisfy the policy
*/ */
void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted); void addFileToProcess(ItemInfo itemInfo, boolean scanCompleted);
/** /**
* Adds all the Item information(file etc) to processing queue. * Adds all the Item information(file etc) to processing queue.
* *
* @param startPath * @param startPathId
* - directory/file, on which SPS was called. * - directoryId/fileId, on which SPS was called.
* @param itemInfoList * @param itemInfoList
* - list of item infos * - list of item infos
* @param scanCompleted * @param scanCompleted
* - whether the scanning of directory fully done with itemInfoList * - whether the scanning of directory fully done with itemInfoList
*/ */
void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList, void addAllFilesToProcess(long startPathId, List<ItemInfo> itemInfoList,
boolean scanCompleted); boolean scanCompleted);
/** /**
@ -117,7 +104,7 @@ public interface SPSService<T> {
/** /**
* Clear inodeId present in the processing queue. * Clear inodeId present in the processing queue.
*/ */
void clearQueue(T spsPath); void clearQueue(long spsPath);
/** /**
* @return the configuration. * @return the configuration.
@ -128,9 +115,9 @@ public interface SPSService<T> {
* Marks the scanning of directory if finished. * Marks the scanning of directory if finished.
* *
* @param spsPath * @param spsPath
* - satisfier path * - satisfier path id
*/ */
void markScanCompletedForPath(T spsPath); void markScanCompletedForPath(long spsPath);
/** /**
* Given node is reporting that it received a certain movement attempt * Given node is reporting that it received a certain movement attempt

View File

@ -78,20 +78,19 @@ import com.google.common.base.Preconditions;
* physical block movements. * physical block movements.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { public class StoragePolicySatisfier implements SPSService, Runnable {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class); LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread; private Daemon storagePolicySatisfierThread;
private BlockStorageMovementNeeded<T> storageMovementNeeded; private BlockStorageMovementNeeded storageMovementNeeded;
private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor; private BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false; private volatile boolean isRunning = false;
private int spsWorkMultiplier; private int spsWorkMultiplier;
private long blockCount = 0L; private long blockCount = 0L;
private int blockMovementMaxRetry; private int blockMovementMaxRetry;
private Context<T> ctxt; private Context ctxt;
private BlockMoveTaskHandler blockMoveTaskHandler;
private final Configuration conf; private final Configuration conf;
private DatanodeCacheManager<T> dnCacheMgr; private DatanodeCacheManager dnCacheMgr;
public StoragePolicySatisfier(Configuration conf) { public StoragePolicySatisfier(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -137,16 +136,11 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
} }
} }
public void init(final Context<T> context, public void init(final Context context) {
final FileCollector<T> fileIDCollector,
final BlockMoveTaskHandler blockMovementTaskHandler,
final BlockMovementListener blockMovementListener) {
this.ctxt = context; this.ctxt = context;
this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context, this.storageMovementNeeded = new BlockStorageMovementNeeded(context);
fileIDCollector); this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>( this, storageMovementNeeded, context);
this, storageMovementNeeded, blockMovementListener);
this.blockMoveTaskHandler = blockMovementTaskHandler;
this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt( this.blockMovementMaxRetry = getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
@ -191,7 +185,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
storagePolicySatisfierThread.start(); storagePolicySatisfierThread.start();
this.storageMovementsMonitor.start(); this.storageMovementsMonitor.start();
this.storageMovementNeeded.activate(); this.storageMovementNeeded.activate();
dnCacheMgr = new DatanodeCacheManager<T>(conf); dnCacheMgr = new DatanodeCacheManager(conf);
} }
@Override @Override
@ -259,7 +253,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
continue; continue;
} }
try { try {
ItemInfo<T> itemInfo = null; ItemInfo itemInfo = null;
boolean retryItem = false; boolean retryItem = false;
if (!ctxt.isInSafeMode()) { if (!ctxt.isInSafeMode()) {
itemInfo = storageMovementNeeded.get(); itemInfo = storageMovementNeeded.get();
@ -271,7 +265,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
storageMovementNeeded.removeItemTrackInfo(itemInfo, false); storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
continue; continue;
} }
T trackId = itemInfo.getFile(); long trackId = itemInfo.getFile();
BlocksMovingAnalysis status = null; BlocksMovingAnalysis status = null;
BlockStoragePolicy existingStoragePolicy; BlockStoragePolicy existingStoragePolicy;
// TODO: presently, context internally acquire the lock // TODO: presently, context internally acquire the lock
@ -353,7 +347,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
blockCount = 0L; blockCount = 0L;
} }
if (retryItem) { if (retryItem) {
itemInfo.increRetryCount(); // itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo); this.storageMovementNeeded.add(itemInfo);
} }
} catch (IOException e) { } catch (IOException e) {
@ -469,7 +463,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen // Check for at least one block storage movement has been chosen
try { try {
blockMoveTaskHandler.submitMoveTask(blkMovingInfo); ctxt.submitMoveTask(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo); LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
StorageTypeNodePair nodeStorage = new StorageTypeNodePair( StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget()); blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
@ -1092,7 +1086,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
} }
@VisibleForTesting @VisibleForTesting
public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() { public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
return storageMovementsMonitor; return storageMovementsMonitor;
} }
@ -1109,7 +1103,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
/** /**
* Clear queues for given track id. * Clear queues for given track id.
*/ */
public void clearQueue(T trackId) { public void clearQueue(long trackId) {
storageMovementNeeded.clearQueue(trackId); storageMovementNeeded.clearQueue(trackId);
} }
@ -1118,7 +1112,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
* attempted or reported time stamp. This is used by * attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/ */
final static class AttemptedItemInfo<T> extends ItemInfo<T> { final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime; private long lastAttemptedOrReportedTime;
private final Set<Block> blocks; private final Set<Block> blocks;
@ -1136,7 +1130,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
* @param retryCount * @param retryCount
* file retry count * file retry count
*/ */
AttemptedItemInfo(T rootId, T trackId, AttemptedItemInfo(long rootId, long trackId,
long lastAttemptedOrReportedTime, long lastAttemptedOrReportedTime,
Set<Block> blocks, int retryCount) { Set<Block> blocks, int retryCount) {
super(rootId, trackId, retryCount); super(rootId, trackId, retryCount);
@ -1179,7 +1173,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
} }
@Override @Override
public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) { public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted); storageMovementNeeded.add(trackInfo, scanCompleted);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block " LOG.debug("Added track info for inode {} to block "
@ -1188,7 +1182,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
} }
@Override @Override
public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList, public void addAllFilesToProcess(long startPath, List<ItemInfo> itemInfoList,
boolean scanCompleted) { boolean scanCompleted) {
getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted); getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
} }
@ -1204,12 +1198,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
} }
@VisibleForTesting @VisibleForTesting
public BlockStorageMovementNeeded<T> getStorageMovementQueue() { public BlockStorageMovementNeeded getStorageMovementQueue() {
return storageMovementNeeded; return storageMovementNeeded;
} }
@Override @Override
public void markScanCompletedForPath(T inodeId) { public void markScanCompletedForPath(long inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId); getStorageMovementQueue().markScanCompletedForDir(inodeId);
} }
@ -1278,15 +1272,4 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
"It should be a positive, non-zero integer value."); "It should be a positive, non-zero integer value.");
return spsWorkMultiplier; return spsWorkMultiplier;
} }
/**
* Sets external listener for testing.
*
* @param blkMovementListener
* block movement listener callback object
*/
@VisibleForTesting
void setBlockMovementListener(BlockMovementListener blkMovementListener) {
storageMovementsMonitor.setBlockMovementListener(blkMovementListener);
}
} }

View File

@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
public class StoragePolicySatisfyManager { public class StoragePolicySatisfyManager {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(StoragePolicySatisfyManager.class); .getLogger(StoragePolicySatisfyManager.class);
private final StoragePolicySatisfier<Long> spsService; private final StoragePolicySatisfier spsService;
private final boolean storagePolicyEnabled; private final boolean storagePolicyEnabled;
private volatile StoragePolicySatisfierMode mode; private volatile StoragePolicySatisfierMode mode;
private final Queue<Long> pathsToBeTraveresed; private final Queue<Long> pathsToBeTraveresed;
@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager {
pathsToBeTraveresed = new LinkedList<Long>(); pathsToBeTraveresed = new LinkedList<Long>();
// instantiate SPS service by just keeps config reference and not starting // instantiate SPS service by just keeps config reference and not starting
// any supporting threads. // any supporting threads.
spsService = new StoragePolicySatisfier<Long>(conf); spsService = new StoragePolicySatisfier(conf);
this.namesystem = namesystem; this.namesystem = namesystem;
this.blkMgr = blkMgr; this.blkMgr = blkMgr;
} }
@ -121,10 +121,7 @@ public class StoragePolicySatisfyManager {
} }
// starts internal daemon service inside namenode // starts internal daemon service inside namenode
spsService.init( spsService.init(
new IntraSPSNameNodeContext(namesystem, blkMgr, spsService), new IntraSPSNameNodeContext(namesystem, blkMgr, spsService));
new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(),
spsService),
new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null);
spsService.start(false, mode); spsService.start(false, mode);
break; break;
case EXTERNAL: case EXTERNAL:
@ -221,13 +218,8 @@ public class StoragePolicySatisfyManager {
mode); mode);
return; return;
} }
spsService.init( spsService.init(new IntraSPSNameNodeContext(this.namesystem, this.blkMgr,
new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService), spsService));
new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
spsService),
new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr,
this.namesystem),
null);
spsService.start(true, newMode); spsService.start(true, newMode);
break; break;
case EXTERNAL: case EXTERNAL:
@ -309,7 +301,7 @@ public class StoragePolicySatisfyManager {
/** /**
* @return internal SPS service instance. * @return internal SPS service instance.
*/ */
public SPSService<Long> getInternalSPSService() { public SPSService getInternalSPSService() {
return this.spsService; return this.spsService;
} }

View File

@ -209,6 +209,6 @@ public interface NamenodeProtocol {
* by External SPS. * by External SPS.
*/ */
@AtMostOnce @AtMostOnce
String getNextSPSPath() throws IOException; Long getNextSPSPath() throws IOException;
} }

View File

@ -76,11 +76,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
private final SaslDataTransferClient saslClient; private final SaslDataTransferClient saslClient;
private final BlockStorageMovementTracker blkMovementTracker; private final BlockStorageMovementTracker blkMovementTracker;
private Daemon movementTrackerThread; private Daemon movementTrackerThread;
private final SPSService<String> service; private final SPSService service;
private final BlockDispatcher blkDispatcher; private final BlockDispatcher blkDispatcher;
public ExternalSPSBlockMoveTaskHandler(Configuration conf, public ExternalSPSBlockMoveTaskHandler(Configuration conf,
NameNodeConnector nnc, SPSService<String> spsService) { NameNodeConnector nnc, SPSService spsService) {
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads); moveExecutor = initializeBlockMoverThreadPool(moverThreads);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.sps;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -27,6 +28,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -34,10 +37,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.Context; import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -49,17 +56,24 @@ import org.slf4j.LoggerFactory;
* SPS from Namenode state. * SPS from Namenode state.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ExternalSPSContext implements Context<String> { public class ExternalSPSContext implements Context {
public static final Logger LOG = public static final Logger LOG = LoggerFactory
LoggerFactory.getLogger(ExternalSPSContext.class); .getLogger(ExternalSPSContext.class);
private SPSService<String> service; private final SPSService service;
private NameNodeConnector nnc = null; private final NameNodeConnector nnc;
private BlockStoragePolicySuite createDefaultSuite = private final BlockStoragePolicySuite createDefaultSuite =
BlockStoragePolicySuite.createDefaultSuite(); BlockStoragePolicySuite.createDefaultSuite();
private final FileCollector fileCollector;
private final BlockMoveTaskHandler externalHandler;
private final BlockMovementListener blkMovementListener;
public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) { public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
this.service = service; this.service = service;
this.nnc = nnc; this.nnc = nnc;
this.fileCollector = new ExternalSPSFilePathCollector(service);
this.externalHandler = new ExternalSPSBlockMoveTaskHandler(
service.getConf(), nnc, service);
this.blkMovementListener = new ExternalBlockMovementListener();
} }
@Override @Override
@ -119,9 +133,10 @@ public class ExternalSPSContext implements Context<String> {
} }
@Override @Override
public boolean isFileExist(String filePath) { public boolean isFileExist(long path) {
Path filePath = DFSUtilClient.makePathFromFileId(path);
try { try {
return nnc.getDistributedFileSystem().exists(new Path(filePath)); return nnc.getDistributedFileSystem().exists(filePath);
} catch (IllegalArgumentException | IOException e) { } catch (IllegalArgumentException | IOException e) {
LOG.warn("Exception while getting file is for the given path:{}", LOG.warn("Exception while getting file is for the given path:{}",
filePath, e); filePath, e);
@ -140,8 +155,9 @@ public class ExternalSPSContext implements Context<String> {
} }
@Override @Override
public void removeSPSHint(String inodeId) throws IOException { public void removeSPSHint(long inodeId) throws IOException {
nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId), Path filePath = DFSUtilClient.makePathFromFileId(inodeId);
nnc.getDistributedFileSystem().removeXAttr(filePath,
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
} }
@ -157,11 +173,12 @@ public class ExternalSPSContext implements Context<String> {
} }
@Override @Override
public HdfsFileStatus getFileInfo(String path) throws IOException { public HdfsFileStatus getFileInfo(long path) throws IOException {
HdfsLocatedFileStatus fileInfo = null; HdfsLocatedFileStatus fileInfo = null;
try { try {
Path filePath = DFSUtilClient.makePathFromFileId(path);
fileInfo = nnc.getDistributedFileSystem().getClient() fileInfo = nnc.getDistributedFileSystem().getClient()
.getLocatedFileInfo(path, false); .getLocatedFileInfo(filePath.toString(), false);
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
LOG.debug("Path:{} doesn't exists!", path, e); LOG.debug("Path:{} doesn't exists!", path, e);
} }
@ -175,7 +192,7 @@ public class ExternalSPSContext implements Context<String> {
} }
@Override @Override
public String getNextSPSPath() { public Long getNextSPSPath() {
try { try {
return nnc.getNNProtocolConnection().getNextSPSPath(); return nnc.getNNProtocolConnection().getNextSPSPath();
} catch (IOException e) { } catch (IOException e) {
@ -185,7 +202,7 @@ public class ExternalSPSContext implements Context<String> {
} }
@Override @Override
public void removeSPSPathId(String pathId) { public void removeSPSPathId(long pathId) {
// We need not specifically implement for external. // We need not specifically implement for external.
} }
@ -193,4 +210,40 @@ public class ExternalSPSContext implements Context<String> {
public void removeAllSPSPathIds() { public void removeAllSPSPathIds() {
// We need not specifically implement for external. // We need not specifically implement for external.
} }
@Override
public void scanAndCollectFiles(long path)
throws IOException, InterruptedException {
fileCollector.scanAndCollectFiles(path);
}
@Override
public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
externalHandler.submitMoveTask(blkMovingInfo);
}
@Override
public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
// External listener if it is plugged-in
if (blkMovementListener != null) {
blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
}
}
/**
* Its an implementation of BlockMovementListener.
*/
private static class ExternalBlockMovementListener
implements BlockMovementListener {
private List<Block> actualBlockMovements = new ArrayList<>();
@Override
public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
for (Block block : moveAttemptFinishedBlks) {
actualBlockMovements.add(block);
}
LOG.info("Movement attempted blocks", actualBlockMovements);
}
}
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -41,14 +42,14 @@ import org.slf4j.LoggerFactory;
* representation. * representation.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ExternalSPSFilePathCollector implements FileCollector <String>{ public class ExternalSPSFilePathCollector implements FileCollector {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSFilePathCollector.class); LoggerFactory.getLogger(ExternalSPSFilePathCollector.class);
private DistributedFileSystem dfs; private DistributedFileSystem dfs;
private SPSService<String> service; private SPSService service;
private int maxQueueLimitToScan; private int maxQueueLimitToScan;
public ExternalSPSFilePathCollector(SPSService<String> service) { public ExternalSPSFilePathCollector(SPSService service) {
this.service = service; this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt( this.maxQueueLimitToScan = service.getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
@ -72,13 +73,13 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
* Recursively scan the given path and add the file info to SPS service for * Recursively scan the given path and add the file info to SPS service for
* processing. * processing.
*/ */
private long processPath(String startID, String childPath) { private long processPath(Long startID, String childPath) {
long pendingWorkCount = 0; // to be satisfied file counter long pendingWorkCount = 0; // to be satisfied file counter
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children; final DirectoryListing children;
try { try {
children = dfs.getClient().listPaths(childPath, lastReturnedName, children = dfs.getClient().listPaths(childPath,
false); lastReturnedName, false);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to list directory " + childPath LOG.warn("Failed to list directory " + childPath
+ ". Ignore the directory and continue.", e); + ". Ignore the directory and continue.", e);
@ -93,18 +94,18 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
} }
for (HdfsFileStatus child : children.getPartialListing()) { for (HdfsFileStatus child : children.getPartialListing()) {
String childFullPath = child.getFullName(childPath);
if (child.isFile()) { if (child.isFile()) {
service.addFileToProcess( service.addFileToProcess(new ItemInfo(startID, child.getFileId()),
new ItemInfo<String>(startID, childFullPath), false); false);
checkProcessingQueuesFree(); checkProcessingQueuesFree();
pendingWorkCount++; // increment to be satisfied file count pendingWorkCount++; // increment to be satisfied file count
} else { } else {
String childFullPathName = child.getFullName(childPath);
if (child.isDirectory()) { if (child.isDirectory()) {
if (!childFullPath.endsWith(Path.SEPARATOR)) { if (!childFullPathName.endsWith(Path.SEPARATOR)) {
childFullPath = childFullPath + Path.SEPARATOR; childFullPathName = childFullPathName + Path.SEPARATOR;
} }
pendingWorkCount += processPath(startID, childFullPath); pendingWorkCount += processPath(startID, childFullPathName);
} }
} }
} }
@ -150,11 +151,12 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
} }
@Override @Override
public void scanAndCollectFiles(String path) throws IOException { public void scanAndCollectFiles(long pathId) throws IOException {
if (dfs == null) { if (dfs == null) {
dfs = getFS(service.getConf()); dfs = getFS(service.getConf());
} }
long pendingSatisfyItemsCount = processPath(path, path); Path filePath = DFSUtilClient.makePathFromFileId(pathId);
long pendingSatisfyItemsCount = processPath(pathId, filePath.toString());
// Check whether the given path contains any item to be tracked // Check whether the given path contains any item to be tracked
// or the no to be satisfied paths. In case of empty list, add the given // or the no to be satisfied paths. In case of empty list, add the given
// inodeId to the 'pendingWorkForDirectory' with empty list so that later // inodeId to the 'pendingWorkForDirectory' with empty list so that later
@ -162,10 +164,10 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
// this path is already satisfied the storage policy. // this path is already satisfied the storage policy.
if (pendingSatisfyItemsCount <= 0) { if (pendingSatisfyItemsCount <= 0) {
LOG.debug("There is no pending items to satisfy the given path " LOG.debug("There is no pending items to satisfy the given path "
+ "inodeId:{}", path); + "inodeId:{}", pathId);
service.addAllFilesToProcess(path, new ArrayList<>(), true); service.addAllFilesToProcess(pathId, new ArrayList<>(), true);
} else { } else {
service.markScanCompletedForPath(path); service.markScanCompletedForPath(pathId);
} }
} }

View File

@ -22,7 +22,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -32,11 +31,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -68,8 +65,7 @@ public final class ExternalStoragePolicySatisfier {
HdfsConfiguration spsConf = new HdfsConfiguration(); HdfsConfiguration spsConf = new HdfsConfiguration();
// login with SPS keytab // login with SPS keytab
secureLogin(spsConf); secureLogin(spsConf);
StoragePolicySatisfier<String> sps = new StoragePolicySatisfier<String>( StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
spsConf);
nnc = getNameNodeConnector(spsConf); nnc = getNameNodeConnector(spsConf);
boolean spsRunning; boolean spsRunning;
@ -82,12 +78,7 @@ public final class ExternalStoragePolicySatisfier {
} }
ExternalSPSContext context = new ExternalSPSContext(sps, nnc); ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
ExternalBlockMovementListener blkMoveListener = sps.init(context);
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
blkMoveListener);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL); sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
if (sps != null) { if (sps != null) {
sps.join(); sps.join();
@ -132,21 +123,4 @@ public final class ExternalStoragePolicySatisfier {
} }
} }
} }
/**
* It is implementation of BlockMovementListener.
*/
private static class ExternalBlockMovementListener
implements BlockMovementListener {
private List<Block> actualBlockMovements = new ArrayList<>();
@Override
public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
for (Block block : moveAttemptFinishedBlks) {
actualBlockMovements.add(block);
}
LOG.info("Movement attempted blocks:{}", actualBlockMovements);
}
}
} }

View File

@ -218,7 +218,7 @@ message GetNextSPSPathRequestProto {
} }
message GetNextSPSPathResponseProto { message GetNextSPSPathResponseProto {
optional string spsPath = 1; optional uint64 spsPath = 1;
} }
/** /**

View File

@ -45,22 +45,22 @@ import org.mockito.Mockito;
*/ */
public class TestBlockStorageMovementAttemptedItems { public class TestBlockStorageMovementAttemptedItems {
private BlockStorageMovementAttemptedItems<Long> bsmAttemptedItems; private BlockStorageMovementAttemptedItems bsmAttemptedItems;
private BlockStorageMovementNeeded<Long> unsatisfiedStorageMovementFiles; private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles;
private final int selfRetryTimeout = 500; private final int selfRetryTimeout = 500;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
Configuration config = new HdfsConfiguration(); Configuration config = new HdfsConfiguration();
Context<Long> ctxt = Mockito.mock(IntraSPSNameNodeContext.class); Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
SPSService<Long> sps = new StoragePolicySatisfier<Long>(config); SPSService sps = new StoragePolicySatisfier(config);
Mockito.when(ctxt.isRunning()).thenReturn(true); Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false); Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true); Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
unsatisfiedStorageMovementFiles = unsatisfiedStorageMovementFiles =
new BlockStorageMovementNeeded<Long>(ctxt, null); new BlockStorageMovementNeeded(ctxt);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems<Long>(sps, bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
unsatisfiedStorageMovementFiles, null); unsatisfiedStorageMovementFiles, ctxt);
} }
@After @After
@ -76,7 +76,7 @@ public class TestBlockStorageMovementAttemptedItems {
long stopTime = monotonicNow() + (retryTimeout * 2); long stopTime = monotonicNow() + (retryTimeout * 2);
boolean isItemFound = false; boolean isItemFound = false;
while (monotonicNow() < (stopTime)) { while (monotonicNow() < (stopTime)) {
ItemInfo<Long> ele = null; ItemInfo ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
if (item == ele.getFile()) { if (item == ele.getFile()) {
isItemFound = true; isItemFound = true;

View File

@ -108,8 +108,6 @@ public class TestStoragePolicySatisfier {
public static final long CAPACITY = 2 * 256 * 1024 * 1024; public static final long CAPACITY = 2 * 256 * 1024 * 1024;
public static final String FILE = "/testMoveToSatisfyStoragePolicy"; public static final String FILE = "/testMoveToSatisfyStoragePolicy";
public static final int DEFAULT_BLOCK_SIZE = 1024; public static final int DEFAULT_BLOCK_SIZE = 1024;
private ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
/** /**
* Sets hdfs cluster. * Sets hdfs cluster.
@ -1282,8 +1280,8 @@ public class TestStoragePolicySatisfier {
//Queue limit can control the traverse logic to wait for some free //Queue limit can control the traverse logic to wait for some free
//entry in queue. After 10 files, traverse control will be on U. //entry in queue. After 10 files, traverse control will be on U.
StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config); StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
Context<Long> ctxt = new IntraSPSNameNodeContext( Context ctxt = new IntraSPSNameNodeContext(
hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) { hdfsCluster.getNamesystem().getBlockManager(), sps) {
@Override @Override
@ -1297,8 +1295,7 @@ public class TestStoragePolicySatisfier {
} }
}; };
FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt);
sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate(); sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root"); INode rootINode = fsDir.getINode("/root");
@ -1314,13 +1311,6 @@ public class TestStoragePolicySatisfier {
dfs.delete(new Path("/root"), true); dfs.delete(new Path("/root"), true);
} }
public FileCollector<Long> createFileIdCollector(
StoragePolicySatisfier<Long> sps, Context<Long> ctxt) {
FileCollector<Long> fileIDCollector = new IntraSPSNameNodeFileIdCollector(
hdfsCluster.getNamesystem().getFSDirectory(), sps);
return fileIDCollector;
}
/** /**
* Test traverse when root parent got deleted. * Test traverse when root parent got deleted.
* 1. Delete L when traversing Q * 1. Delete L when traversing Q
@ -1351,8 +1341,8 @@ public class TestStoragePolicySatisfier {
// Queue limit can control the traverse logic to wait for some free // Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U. // entry in queue. After 10 files, traverse control will be on U.
StoragePolicySatisfier<Long> sps = new StoragePolicySatisfier<Long>(config); StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
Context<Long> ctxt = new IntraSPSNameNodeContext( Context ctxt = new IntraSPSNameNodeContext(
hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) { hdfsCluster.getNamesystem().getBlockManager(), sps) {
@Override @Override
@ -1365,8 +1355,7 @@ public class TestStoragePolicySatisfier {
return true; return true;
} }
}; };
FileCollector<Long> fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt);
sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate(); sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root"); INode rootINode = fsDir.getINode("/root");
@ -1383,12 +1372,12 @@ public class TestStoragePolicySatisfier {
} }
private void assertTraversal(List<String> expectedTraverseOrder, private void assertTraversal(List<String> expectedTraverseOrder,
FSDirectory fsDir, StoragePolicySatisfier<Long> sps) FSDirectory fsDir, StoragePolicySatisfier sps)
throws InterruptedException { throws InterruptedException {
// Remove 10 element and make queue free, So other traversing will start. // Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0); String path = expectedTraverseOrder.remove(0);
ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get(); ItemInfo itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) { if (itemInfo == null) {
continue; continue;
} }
@ -1403,7 +1392,7 @@ public class TestStoragePolicySatisfier {
// Check other element traversed in order and E, M, U, R, S should not be // Check other element traversed in order and E, M, U, R, S should not be
// added in queue which we already removed from expected list // added in queue which we already removed from expected list
for (String path : expectedTraverseOrder) { for (String path : expectedTraverseOrder) {
ItemInfo<Long> itemInfo = sps.getStorageMovementQueue().get(); ItemInfo itemInfo = sps.getStorageMovementQueue().get();
if (itemInfo == null) { if (itemInfo == null) {
continue; continue;
} }
@ -1717,17 +1706,17 @@ public class TestStoragePolicySatisfier {
public void waitForAttemptedItems(long expectedBlkMovAttemptedCount, public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
int timeout) throws TimeoutException, InterruptedException { int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier<Long> sps = final StoragePolicySatisfier sps =
(StoragePolicySatisfier<Long>) blockManager.getSPSManager() (StoragePolicySatisfier) blockManager.getSPSManager()
.getInternalSPSService(); .getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount, expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems<Long>) (sps ((BlockStorageMovementAttemptedItems) (sps
.getAttemptedItemsMonitor())).getAttemptedItemsCount()); .getAttemptedItemsMonitor())).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems<Long>) (sps return ((BlockStorageMovementAttemptedItems) (sps
.getAttemptedItemsMonitor())) .getAttemptedItemsMonitor()))
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount; .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
} }
@ -1737,15 +1726,17 @@ public class TestStoragePolicySatisfier {
public void waitForBlocksMovementAttemptReport( public void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout) long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
Assert.assertNotNull("Didn't set external block move listener", BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
blkMoveListener); final StoragePolicySatisfier sps =
(StoragePolicySatisfier) blockManager.getSPSManager()
.getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
int actualCount = blkMoveListener.getActualBlockMovements().size(); int actualCount = ((BlockStorageMovementAttemptedItems) (sps
.getAttemptedItemsMonitor())).getAttemptedItemsCount();
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount, expectedMovementFinishedBlocksCount, actualCount);
actualCount);
return actualCount return actualCount
>= expectedMovementFinishedBlocksCount; >= expectedMovementFinishedBlocksCount;
} }
@ -1798,29 +1789,12 @@ public class TestStoragePolicySatisfier {
.numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
.storageTypes(storageTypes).storageCapacities(capacities).build(); .storageTypes(storageTypes).storageCapacities(capacities).build();
cluster.waitActive(); cluster.waitActive();
// Sets external listener for assertion.
blkMoveListener.clear();
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier<Long> sps =
(StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService();
sps.setBlockMovementListener(blkMoveListener);
return cluster; return cluster;
} }
public void restartNamenode() throws IOException { public void restartNamenode() throws IOException {
hdfsCluster.restartNameNodes(); hdfsCluster.restartNameNodes();
hdfsCluster.waitActive(); hdfsCluster.waitActive();
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
StoragePolicySatisfyManager spsMgr = blockManager.getSPSManager();
if (spsMgr != null && spsMgr.isInternalSatisfierRunning()) {
// Sets external listener for assertion.
blkMoveListener.clear();
final StoragePolicySatisfier<Long> sps =
(StoragePolicySatisfier<Long>) spsMgr.getInternalSPSService();
sps.setBlockMovementListener(blkMoveListener);
}
} }
/** /**

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier.ExternalBlockMovementListener;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -71,8 +70,6 @@ public class TestStoragePolicySatisfierWithStripedFile {
private int cellSize; private int cellSize;
private int defaultStripeBlockSize; private int defaultStripeBlockSize;
private Configuration conf; private Configuration conf;
private ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
private ErasureCodingPolicy getEcPolicy() { private ErasureCodingPolicy getEcPolicy() {
return StripedFileTestUtil.getDefaultECPolicy(); return StripedFileTestUtil.getDefaultECPolicy();
@ -94,6 +91,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
// Reduced refresh cycle to update latest datanodes. // Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS, conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000); 1000);
conf.setInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, 30);
initConfWithStripe(conf, defaultStripeBlockSize); initConfWithStripe(conf, defaultStripeBlockSize);
} }
@ -135,14 +134,6 @@ public class TestStoragePolicySatisfierWithStripedFile {
try { try {
cluster.waitActive(); cluster.waitActive();
// Sets external listener for assertion.
blkMoveListener.clear();
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier<Long> sps =
(StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService();
sps.setBlockMovementListener(blkMoveListener);
DistributedFileSystem dfs = cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy( dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName()); StripedFileTestUtil.getDefaultECPolicy().getName());
@ -253,14 +244,6 @@ public class TestStoragePolicySatisfierWithStripedFile {
try { try {
cluster.waitActive(); cluster.waitActive();
// Sets external listener for assertion.
blkMoveListener.clear();
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier<Long> sps =
(StoragePolicySatisfier<Long>) blockManager
.getSPSManager().getInternalSPSService();
sps.setBlockMovementListener(blkMoveListener);
DistributedFileSystem dfs = cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy( dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName()); StripedFileTestUtil.getDefaultECPolicy().getName());
@ -400,10 +383,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
fs.satisfyStoragePolicy(fooFile); fs.satisfyStoragePolicy(fooFile);
DFSTestUtil.waitExpectedStorageType(fooFile.toString(), DFSTestUtil.waitExpectedStorageType(fooFile.toString(),
StorageType.ARCHIVE, 5, 30000, cluster.getFileSystem()); StorageType.ARCHIVE, 5, 30000, cluster.getFileSystem());
//Start reaming datanodes //Start remaining datanodes
for (int i = numOfDatanodes - 1; i >= 5; i--) { for (int i = numOfDatanodes - 1; i >= 5; i--) {
cluster.restartDataNode(list.get(i), false); cluster.restartDataNode(list.get(i), false);
} }
cluster.waitActive();
// verify storage types and locations. // verify storage types and locations.
waitExpectedStorageType(cluster, fooFile.toString(), fileLen, waitExpectedStorageType(cluster, fooFile.toString(), fileLen,
StorageType.ARCHIVE, 9, 9, 60000); StorageType.ARCHIVE, 9, 9, 60000);
@ -511,17 +495,17 @@ public class TestStoragePolicySatisfierWithStripedFile {
long expectedBlkMovAttemptedCount, int timeout) long expectedBlkMovAttemptedCount, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
BlockManager blockManager = cluster.getNamesystem().getBlockManager(); BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier<Long> sps = final StoragePolicySatisfier sps =
(StoragePolicySatisfier<Long>) blockManager (StoragePolicySatisfier) blockManager
.getSPSManager().getInternalSPSService(); .getSPSManager().getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount, expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems<Long>) sps ((BlockStorageMovementAttemptedItems) sps
.getAttemptedItemsMonitor()).getAttemptedItemsCount()); .getAttemptedItemsMonitor()).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems<Long>) sps return ((BlockStorageMovementAttemptedItems) sps
.getAttemptedItemsMonitor()) .getAttemptedItemsMonitor())
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount; .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
} }
@ -583,12 +567,15 @@ public class TestStoragePolicySatisfierWithStripedFile {
private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster, private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
long expectedMoveFinishedBlks, int timeout) long expectedMoveFinishedBlks, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
Assert.assertNotNull("Didn't set external block move listener", BlockManager blockManager = cluster.getNamesystem().getBlockManager();
blkMoveListener); final StoragePolicySatisfier sps =
(StoragePolicySatisfier) blockManager.getSPSManager()
.getInternalSPSService();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
int actualCount = blkMoveListener.getActualBlockMovements().size(); int actualCount = ((BlockStorageMovementAttemptedItems) (sps
.getAttemptedItemsMonitor())).getMovementFinishedBlocksCount();
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMoveFinishedBlks, expectedMoveFinishedBlks,
actualCount); actualCount);

View File

@ -88,10 +88,8 @@ public class TestExternalStoragePolicySatisfier
private String principal; private String principal;
private MiniKdc kdc; private MiniKdc kdc;
private File baseDir; private File baseDir;
private StoragePolicySatisfier<String> externalSps; private StoragePolicySatisfier externalSps;
private ExternalSPSContext externalCtxt; private ExternalSPSContext externalCtxt;
private ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
@After @After
public void destroy() throws Exception { public void destroy() throws Exception {
@ -143,16 +141,10 @@ public class TestExternalStoragePolicySatisfier
nnc = getNameNodeConnector(getConf()); nnc = getNameNodeConnector(getConf());
externalSps = new StoragePolicySatisfier<String>(getConf()); externalSps = new StoragePolicySatisfier(getConf());
externalCtxt = new ExternalSPSContext(externalSps, nnc); externalCtxt = new ExternalSPSContext(externalSps, nnc);
blkMoveListener.clear(); externalSps.init(externalCtxt);
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(conf, nnc,
externalSps);
externalSps.init(externalCtxt,
new ExternalSPSFilePathCollector(externalSps), externalHandler,
blkMoveListener);
externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
return cluster; return cluster;
} }
@ -164,16 +156,10 @@ public class TestExternalStoragePolicySatisfier
getCluster().restartNameNodes(); getCluster().restartNameNodes();
getCluster().waitActive(); getCluster().waitActive();
externalSps = new StoragePolicySatisfier<>(getConf()); externalSps = new StoragePolicySatisfier(getConf());
externalCtxt = new ExternalSPSContext(externalSps, nnc); externalCtxt = new ExternalSPSContext(externalSps, nnc);
blkMoveListener.clear(); externalSps.init(externalCtxt);
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(getConf(), nnc,
externalSps);
externalSps.init(externalCtxt,
new ExternalSPSFilePathCollector(externalSps), externalHandler,
blkMoveListener);
externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL); externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
} }
@ -206,11 +192,11 @@ public class TestExternalStoragePolicySatisfier
public Boolean get() { public Boolean get() {
LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
expectedBlkMovAttemptedCount, expectedBlkMovAttemptedCount,
((BlockStorageMovementAttemptedItems<String>) (externalSps ((BlockStorageMovementAttemptedItems) (externalSps
.getAttemptedItemsMonitor())).getAttemptedItemsCount()); .getAttemptedItemsMonitor())).getAttemptedItemsCount());
return ((BlockStorageMovementAttemptedItems<String>) (externalSps return ((BlockStorageMovementAttemptedItems) (externalSps
.getAttemptedItemsMonitor())) .getAttemptedItemsMonitor()))
.getAttemptedItemsCount() == expectedBlkMovAttemptedCount; .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
} }
}, 100, timeout); }, 100, timeout);
} }
@ -218,12 +204,11 @@ public class TestExternalStoragePolicySatisfier
public void waitForBlocksMovementAttemptReport( public void waitForBlocksMovementAttemptReport(
long expectedMovementFinishedBlocksCount, int timeout) long expectedMovementFinishedBlocksCount, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
Assert.assertNotNull("Didn't set external block move listener",
blkMoveListener);
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
int actualCount = blkMoveListener.getActualBlockMovements().size(); int actualCount = externalSps.getAttemptedItemsMonitor()
.getAttemptedItemsCount();
LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
expectedMovementFinishedBlocksCount, actualCount); expectedMovementFinishedBlocksCount, actualCount);
return actualCount return actualCount