HDFS-12225: [SPS]: Optimize extended attributes for tracking SPS movements. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Uma Maheswara Rao G 2017-08-23 15:37:03 -07:00 committed by Uma Maheswara Rao Gangumalla
parent 0e820f16af
commit 7ea24fc06c
14 changed files with 588 additions and 258 deletions

View File

@ -89,7 +89,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@ -431,9 +430,6 @@ public class BlockManager implements BlockStatsMXBean {
private final StoragePolicySatisfier sps; private final StoragePolicySatisfier sps;
private final boolean storagePolicyEnabled; private final boolean storagePolicyEnabled;
private boolean spsEnabled; private boolean spsEnabled;
private final BlockStorageMovementNeeded storageMovementNeeded =
new BlockStorageMovementNeeded();
/** Minimum live replicas needed for the datanode to be transitioned /** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE. * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/ */
@ -480,8 +476,7 @@ public class BlockManager implements BlockStatsMXBean {
conf.getBoolean( conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT); DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this, sps = new StoragePolicySatisfier(namesystem, this, conf);
conf);
blockTokenSecretManager = createBlockTokenSecretManager(conf); blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@ -5016,20 +5011,6 @@ public class BlockManager implements BlockStatsMXBean {
return providedStorageMap; return providedStorageMap;
} }
/**
* Set file block collection for which storage movement needed for its blocks.
*
* @param id
* - file block collection id.
*/
public void satisfyStoragePolicy(long id) {
storageMovementNeeded.add(id);
if (LOG.isDebugEnabled()) {
LOG.debug("Added block collection id {} to block "
+ "storageMovementNeeded queue", id);
}
}
/** /**
* Gets the storage policy satisfier instance. * Gets the storage policy satisfier instance.
* *

View File

@ -1751,6 +1751,13 @@ public class DatanodeManager {
} }
} }
if (nodeinfo.shouldDropSPSWork()) {
cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
// Set back to false to indicate that the new value has been sent to the
// datanode.
nodeinfo.setDropSPSWork(false);
}
// check pending block storage movement tasks // check pending block storage movement tasks
BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
.getBlocksToMoveStorages(); .getBlocksToMoveStorages();
@ -1762,13 +1769,6 @@ public class DatanodeManager {
blkStorageMovementInfosBatch.getBlockMovingInfo())); blkStorageMovementInfosBatch.getBlockMovingInfo()));
} }
if (nodeinfo.shouldDropSPSWork()) {
cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
// Set back to false to indicate that the new value has been sent to the
// datanode.
nodeinfo.setDropSPSWork(false);
}
if (!cmds.isEmpty()) { if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]); return cmds.toArray(new DatanodeCommand[cmds.size()]);
} }

View File

@ -837,6 +837,7 @@ class BPOfferService {
case DatanodeProtocol.DNA_UNCACHE: case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break; break;
default: default:

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -54,7 +55,7 @@ public class BlockStorageMovementAttemptedItems {
* A map holds the items which are already taken for blocks movements * A map holds the items which are already taken for blocks movements
* processing and sent to DNs. * processing and sent to DNs.
*/ */
private final Map<Long, ItemInfo> storageMovementAttemptedItems; private final Map<Long, AttemptedItemInfo> storageMovementAttemptedItems;
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults; private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private volatile boolean monitorRunning = true; private volatile boolean monitorRunning = true;
private Daemon timerThread = null; private Daemon timerThread = null;
@ -91,18 +92,19 @@ public class BlockStorageMovementAttemptedItems {
* Add item to block storage movement attempted items map which holds the * Add item to block storage movement attempted items map which holds the
* tracking/blockCollection id versus time stamp. * tracking/blockCollection id versus time stamp.
* *
* @param blockCollectionID * @param itemInfo
* - tracking id / block collection id * - tracking info
* @param allBlockLocsAttemptedToSatisfy * @param allBlockLocsAttemptedToSatisfy
* - failed to find matching target nodes to satisfy storage type for * - failed to find matching target nodes to satisfy storage type
* all the block locations of the given blockCollectionID * for all the block locations of the given blockCollectionID
*/ */
public void add(Long blockCollectionID, public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
boolean allBlockLocsAttemptedToSatisfy) {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
ItemInfo itemInfo = new ItemInfo(monotonicNow(), AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
allBlockLocsAttemptedToSatisfy); allBlockLocsAttemptedToSatisfy);
storageMovementAttemptedItems.put(blockCollectionID, itemInfo); storageMovementAttemptedItems.put(itemInfo.getTrackId(),
attemptedItemInfo);
} }
} }
@ -167,21 +169,27 @@ public class BlockStorageMovementAttemptedItems {
* satisfy storage policy. This is used by * satisfy storage policy. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/ */
private final static class ItemInfo { private final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime; private long lastAttemptedOrReportedTime;
private final boolean allBlockLocsAttemptedToSatisfy; private final boolean allBlockLocsAttemptedToSatisfy;
/** /**
* ItemInfo constructor. * AttemptedItemInfo constructor.
* *
* @param rootId
* rootId for trackId
* @param trackId
* trackId for file.
* @param lastAttemptedOrReportedTime * @param lastAttemptedOrReportedTime
* last attempted or reported time * last attempted or reported time
* @param allBlockLocsAttemptedToSatisfy * @param allBlockLocsAttemptedToSatisfy
* whether all the blocks in the trackID were attempted and blocks * whether all the blocks in the trackID were attempted and blocks
* movement has been scheduled to satisfy storage policy * movement has been scheduled to satisfy storage policy
*/ */
private ItemInfo(long lastAttemptedOrReportedTime, private AttemptedItemInfo(long rootId, long trackId,
long lastAttemptedOrReportedTime,
boolean allBlockLocsAttemptedToSatisfy) { boolean allBlockLocsAttemptedToSatisfy) {
super(rootId, trackId);
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime; this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy; this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
} }
@ -211,6 +219,7 @@ public class BlockStorageMovementAttemptedItems {
private void touchLastReportedTimeStamp() { private void touchLastReportedTimeStamp() {
this.lastAttemptedOrReportedTime = monotonicNow(); this.lastAttemptedOrReportedTime = monotonicNow();
} }
} }
/** /**
@ -239,18 +248,20 @@ public class BlockStorageMovementAttemptedItems {
@VisibleForTesting @VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() { void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
Iterator<Entry<Long, ItemInfo>> iter = storageMovementAttemptedItems Iterator<Entry<Long, AttemptedItemInfo>> iter =
.entrySet().iterator(); storageMovementAttemptedItems.entrySet().iterator();
long now = monotonicNow(); long now = monotonicNow();
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<Long, ItemInfo> entry = iter.next(); Entry<Long, AttemptedItemInfo> entry = iter.next();
ItemInfo itemInfo = entry.getValue(); AttemptedItemInfo itemInfo = entry.getValue();
if (now > itemInfo.getLastAttemptedOrReportedTime() if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) { + selfRetryTimeout) {
Long blockCollectionID = entry.getKey(); Long blockCollectionID = entry.getKey();
synchronized (storageMovementAttemptedResults) { synchronized (storageMovementAttemptedResults) {
if (!isExistInResult(blockCollectionID)) { if (!isExistInResult(blockCollectionID)) {
blockStorageMovementNeeded.add(blockCollectionID); ItemInfo candidate = new ItemInfo(
itemInfo.getRootId(), blockCollectionID);
blockStorageMovementNeeded.add(candidate);
iter.remove(); iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed " LOG.info("TrackID: {} becomes timed out and moved to needed "
+ "retries queue for next iteration.", blockCollectionID); + "retries queue for next iteration.", blockCollectionID);
@ -297,17 +308,30 @@ public class BlockStorageMovementAttemptedItems {
synchronized (storageMovementAttemptedItems) { synchronized (storageMovementAttemptedItems) {
Status status = storageMovementAttemptedResult.getStatus(); Status status = storageMovementAttemptedResult.getStatus();
long trackId = storageMovementAttemptedResult.getTrackId(); long trackId = storageMovementAttemptedResult.getTrackId();
ItemInfo itemInfo; AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems
.get(trackId);
// itemInfo is null means no root for trackId, using trackId only as
// root and handling it in
// blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
// the xAttr
ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
? attemptedItemInfo.getRootId() : trackId, trackId);
switch (status) { switch (status) {
case FAILURE: case FAILURE:
blockStorageMovementNeeded.add(trackId); if (attemptedItemInfo != null) {
LOG.warn("Blocks storage movement results for the tracking id: {}" blockStorageMovementNeeded.add(itemInfo);
+ " is reported from co-ordinating datanode, but result" LOG.warn("Blocks storage movement results for the tracking id:"
+ " status is FAILURE. So, added for retry", trackId); + "{} is reported from co-ordinating datanode, but result"
+ " status is FAILURE. So, added for retry", trackId);
} else {
LOG.info("Blocks storage movement is FAILURE for the track"
+ " id {}. But the trackID doesn't exists in"
+ " storageMovementAttemptedItems list.", trackId);
blockStorageMovementNeeded
.removeItemTrackInfo(itemInfo);
}
break; break;
case SUCCESS: case SUCCESS:
itemInfo = storageMovementAttemptedItems.get(trackId);
// ItemInfo could be null. One case is, before the blocks movements // ItemInfo could be null. One case is, before the blocks movements
// result arrives the attempted trackID became timed out and then // result arrives the attempted trackID became timed out and then
// removed the trackID from the storageMovementAttemptedItems list. // removed the trackID from the storageMovementAttemptedItems list.
@ -318,33 +342,32 @@ public class BlockStorageMovementAttemptedItems {
// storage policy in previous SPS iteration. // storage policy in previous SPS iteration.
String msg = "Blocks storage movement is SUCCESS for the track id: " String msg = "Blocks storage movement is SUCCESS for the track id: "
+ trackId + " reported from co-ordinating datanode."; + trackId + " reported from co-ordinating datanode.";
if (itemInfo != null) { if (attemptedItemInfo != null) {
if (!itemInfo.isAllBlockLocsAttemptedToSatisfy()) { if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
blockStorageMovementNeeded.add(trackId); blockStorageMovementNeeded
.add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
LOG.warn("{} But adding trackID back to retry queue as some of" LOG.warn("{} But adding trackID back to retry queue as some of"
+ " the blocks couldn't find matching target nodes in" + " the blocks couldn't find matching target nodes in"
+ " previous SPS iteration.", msg); + " previous SPS iteration.", msg);
} else { } else {
LOG.info(msg); LOG.info(msg);
// Remove xattr for the track id. blockStorageMovementNeeded
this.sps.postBlkStorageMovementCleanup( .removeItemTrackInfo(itemInfo);
storageMovementAttemptedResult.getTrackId());
} }
} else { } else {
LOG.info("{} But the trackID doesn't exists in " LOG.info("{} But the trackID doesn't exists in "
+ "storageMovementAttemptedItems list", msg); + "storageMovementAttemptedItems list", msg);
// Remove xattr for the track id. blockStorageMovementNeeded
this.sps.postBlkStorageMovementCleanup( .removeItemTrackInfo(itemInfo);
storageMovementAttemptedResult.getTrackId());
} }
break; break;
case IN_PROGRESS: case IN_PROGRESS:
isInprogress = true; isInprogress = true;
itemInfo = storageMovementAttemptedItems attemptedItemInfo = storageMovementAttemptedItems
.get(storageMovementAttemptedResult.getTrackId()); .get(storageMovementAttemptedResult.getTrackId());
if(itemInfo != null){ if(attemptedItemInfo != null){
// update the attempted expiration time to next cycle. // update the attempted expiration time to next cycle.
itemInfo.touchLastReportedTimeStamp(); attemptedItemInfo.touchLastReportedTimeStamp();
} }
break; break;
default: default:

View File

@ -17,28 +17,86 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A Class to track the block collection IDs for which physical storage movement * A Class to track the block collection IDs (Inode's ID) for which physical
* needed as per the Namespace and StorageReports from DN. * storage movement needed as per the Namespace and StorageReports from DN.
* It scan the pending directories for which storage movement is required and
* schedule the block collection IDs for movement. It track the info of
* scheduled items and remove the SPS xAttr from the file/Directory once
* movement is success.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BlockStorageMovementNeeded { public class BlockStorageMovementNeeded {
private final Queue<Long> storageMovementNeeded = new LinkedList<Long>();
public static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
private final Queue<ItemInfo> storageMovementNeeded =
new LinkedList<ItemInfo>();
/** /**
* Add the block collection id to tracking list for which storage movement * Map of rootId and number of child's. Number of child's indicate the number
* of files pending to satisfy the policy.
*/
private final Map<Long, Integer> pendingWorkForDirectory =
new HashMap<Long, Integer>();
private final Namesystem namesystem;
// List of pending dir to satisfy the policy
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
private final StoragePolicySatisfier sps;
private Daemon fileInodeIdCollector;
public BlockStorageMovementNeeded(Namesystem namesystem,
StoragePolicySatisfier sps) {
this.namesystem = namesystem;
this.sps = sps;
}
/**
* Add the candidate to tracking list for which storage movement
* expected if necessary. * expected if necessary.
* *
* @param blockCollectionID * @param trackInfo
* - block collection id, which is nothing but inode id. * - track info for satisfy the policy
*/ */
public synchronized void add(Long blockCollectionID) { public synchronized void add(ItemInfo trackInfo) {
storageMovementNeeded.add(blockCollectionID); storageMovementNeeded.add(trackInfo);
}
/**
* Add the itemInfo to tracking list for which storage movement
* expected if necessary.
* @param rootId
* - root inode id
* @param itemInfoList
* - List of child in the directory
*/
private synchronized void addAll(Long rootId,
List<ItemInfo> itemInfoList) {
storageMovementNeeded.addAll(itemInfoList);
pendingWorkForDirectory.put(rootId, itemInfoList.size());
} }
/** /**
@ -47,11 +105,168 @@ public class BlockStorageMovementNeeded {
* *
* @return block collection ID * @return block collection ID
*/ */
public synchronized Long get() { public synchronized ItemInfo get() {
return storageMovementNeeded.poll(); return storageMovementNeeded.poll();
} }
public synchronized void addToPendingDirQueue(long id) {
spsDirsToBeTraveresed.add(id);
// Notify waiting FileInodeIdCollector thread about the newly
// added SPS path.
synchronized (spsDirsToBeTraveresed) {
spsDirsToBeTraveresed.notify();
}
}
public synchronized void clearAll() { public synchronized void clearAll() {
spsDirsToBeTraveresed.clear();
storageMovementNeeded.clear(); storageMovementNeeded.clear();
pendingWorkForDirectory.clear();
}
/**
* Decrease the pending child count for directory once one file blocks moved
* successfully. Remove the SPS xAttr if pending child count is zero.
*/
public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
throws IOException {
if (trackInfo.isDir()) {
// If track is part of some root then reduce the pending directory work
// count.
long rootId = trackInfo.getRootId();
INode inode = namesystem.getFSDirectory().getInode(rootId);
if (inode == null) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(rootId);
} else {
if (pendingWorkForDirectory.get(rootId) != null) {
Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
pendingWorkForDirectory.put(rootId, pendingWork);
if (pendingWork <= 0) {
namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
pendingWorkForDirectory.remove(rootId);
}
}
}
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
namesystem.removeXattr(trackInfo.getTrackId(),
XATTR_SATISFY_STORAGE_POLICY);
}
}
public synchronized void clearQueue(long trackId) {
spsDirsToBeTraveresed.remove(trackId);
Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
ItemInfo next = iterator.next();
if (next.getRootId() == trackId) {
iterator.remove();
}
}
pendingWorkForDirectory.remove(trackId);
}
/**
* Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
* and notify to clean up required resources.
* @throws IOException
*/
public synchronized void clearQueuesWithNotification() {
// Remove xAttr from directories
Long trackId;
while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
try {
// Remove xAttr for file
namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
} catch (IOException ie) {
LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
}
}
// File's directly added to storageMovementNeeded, So try to remove
// xAttr for file
ItemInfo itemInfo;
while ((itemInfo = storageMovementNeeded.poll()) != null) {
try {
// Remove xAttr for file
if (!itemInfo.isDir()) {
namesystem.removeXattr(itemInfo.getTrackId(),
XATTR_SATISFY_STORAGE_POLICY);
}
} catch (IOException ie) {
LOG.warn(
"Failed to remove SPS xattr for track id "
+ itemInfo.getTrackId(), ie);
}
}
this.clearAll();
}
/**
* Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
* ID's to process for satisfy the policy.
*/
private class FileInodeIdCollector implements Runnable {
@Override
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
while (namesystem.isRunning() && sps.isRunning()) {
try {
if (!namesystem.isInSafeMode()) {
FSDirectory fsd = namesystem.getFSDirectory();
Long rootINodeId = spsDirsToBeTraveresed.poll();
if (rootINodeId == null) {
// Waiting for SPS path
synchronized (spsDirsToBeTraveresed) {
spsDirsToBeTraveresed.wait(5000);
}
} else {
INode rootInode = fsd.getInode(rootINodeId);
if (rootInode != null) {
// TODO : HDFS-12291
// 1. Implement an efficient recursive directory iteration
// mechanism and satisfies storage policy for all the files
// under the given directory.
// 2. Process files in batches,so datanodes workload can be
// handled.
List<ItemInfo> itemInfoList =
new ArrayList<>();
for (INode childInode : rootInode.asDirectory()
.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
if (childInode.isFile()
&& childInode.asFile().numBlocks() != 0) {
itemInfoList.add(
new ItemInfo(rootINodeId, childInode.getId()));
}
}
if (itemInfoList.isEmpty()) {
// satisfy track info is empty, so remove the xAttr from the
// directory
namesystem.removeXattr(rootINodeId,
XATTR_SATISFY_STORAGE_POLICY);
}
addAll(rootINodeId, itemInfoList);
}
}
}
} catch (Throwable t) {
LOG.warn("Exception while loading inodes to satisfy the policy", t);
}
}
}
}
public void start() {
fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
fileInodeIdCollector.setName("FileInodeIdCollector");
fileInodeIdCollector.start();
}
public void stop() {
if (fileInodeIdCollector != null) {
fileInodeIdCollector.interrupt();
}
} }
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
@ -31,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -60,10 +60,24 @@ final class FSDirSatisfyStoragePolicyOp {
if (fsd.isPermissionEnabled()) { if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE); fsd.checkPathAccess(pc, iip, FsAction.WRITE);
} }
XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd); INode inode = FSDirectory.resolveLastINode(iip);
if (satisfyXAttr != null) { if (inodeHasSatisfyXAttr(inode)) {
throw new IOException(
"Cannot request to call satisfy storage policy on path "
+ inode.getFullPathName()
+ ", as this file/dir was already called for satisfying "
+ "storage policy.");
}
if (unprotectedSatisfyStoragePolicy(inode, fsd)) {
XAttr satisfyXAttr = XAttrHelper
.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1); List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(satisfyXAttr); xAttrs.add(satisfyXAttr);
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
XAttrStorage.updateINodeXAttrs(inode, newXAttrs,
iip.getLatestSnapshotId());
fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
} }
} finally { } finally {
@ -72,62 +86,29 @@ final class FSDirSatisfyStoragePolicyOp {
return fsd.getAuditFileInfo(iip); return fsd.getAuditFileInfo(iip);
} }
static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip, static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) {
BlockManager bm, FSDirectory fsd) throws IOException {
final INode inode = FSDirectory.resolveLastINode(iip);
final int snapshotId = iip.getLatestSnapshotId();
final List<INode> candidateNodes = new ArrayList<>();
// TODO: think about optimization here, label the dir instead
// of the sub-files of the dir.
if (inode.isFile() && inode.asFile().numBlocks() != 0) { if (inode.isFile() && inode.asFile().numBlocks() != 0) {
candidateNodes.add(inode); // Adding directly in the storageMovementNeeded queue, So it can
} else if (inode.isDirectory()) { // get more priority compare to directory.
for (INode node : inode.asDirectory().getChildrenList(snapshotId)) { fsd.getBlockManager().getStoragePolicySatisfier()
if (node.isFile() && node.asFile().numBlocks() != 0) { .satisfyStoragePolicy(inode.getId());
candidateNodes.add(node); return true;
} } else if (inode.isDirectory()
} && inode.asDirectory().getChildrenNum(Snapshot.CURRENT_STATE_ID) > 0) {
// Adding directory in the pending queue, so FileInodeIdCollector process
// directory child in batch and recursively
fsd.getBlockManager().getStoragePolicySatisfier()
.addInodeToPendingDirQueue(inode.getId());
return true;
} }
return false;
if (candidateNodes.isEmpty()) {
return null;
}
// If node has satisfy xattr, then stop adding it
// to satisfy movement queue.
if (inodeHasSatisfyXAttr(candidateNodes)) {
throw new IOException(
"Cannot request to call satisfy storage policy on path "
+ iip.getPath()
+ ", as this file/dir was already called for satisfying "
+ "storage policy.");
}
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
final XAttr satisfyXAttr = XAttrHelper
.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
xattrs.add(satisfyXAttr);
for (INode node : candidateNodes) {
bm.satisfyStoragePolicy(node.getId());
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(node);
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
xattrs, EnumSet.of(XAttrSetFlag.CREATE));
XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId);
}
return satisfyXAttr;
} }
private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) { private static boolean inodeHasSatisfyXAttr(INode inode) {
// If the node is a directory and one of the child files final XAttrFeature f = inode.getXAttrFeature();
// has satisfy xattr, then return true for this directory. if (inode.isFile() && f != null
for (INode inode : candidateNodes) { && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
final XAttrFeature f = inode.getXAttrFeature(); return true;
if (inode.isFile() && f != null
&& f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
return true;
}
} }
return false; return false;
} }

View File

@ -206,6 +206,14 @@ class FSDirXAttrOp {
List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove, List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
removedXAttrs); removedXAttrs);
if (existingXAttrs.size() != newXAttrs.size()) { if (existingXAttrs.size() != newXAttrs.size()) {
for (XAttr xattr : toRemove) {
if (XATTR_SATISFY_STORAGE_POLICY
.equals(XAttrHelper.getPrefixedName(xattr))) {
fsd.getBlockManager().getStoragePolicySatisfier()
.clearQueue(inode.getId());
break;
}
}
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId); XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
return removedXAttrs; return removedXAttrs;
} }
@ -297,8 +305,7 @@ class FSDirXAttrOp {
// Add inode id to movement queue if xattrs contain satisfy xattr. // Add inode id to movement queue if xattrs contain satisfy xattr.
if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) { if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) {
FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(iip, FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, fsd);
fsd.getBlockManager(), fsd);
continue; continue;
} }

View File

@ -1415,7 +1415,7 @@ public class FSDirectory implements Closeable {
if (xattr == null) { if (xattr == null) {
return; return;
} }
getBlockManager().satisfyStoragePolicy(inode.getId()); FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, this);
} }
private void addEncryptionZone(INodeWithAdditionalFields inode, private void addEncryptionZone(INodeWithAdditionalFields inode,

View File

@ -1322,7 +1322,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
writeLock(); writeLock();
try { try {
if (blockManager != null) { if (blockManager != null) {
blockManager.stopSPS(true); blockManager.stopSPS(false);
} }
stopSecretManager(); stopSecretManager();
leaseManager.stopMonitor(); leaseManager.stopMonitor();

View File

@ -17,9 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -106,10 +103,10 @@ public class StoragePolicySatisfier implements Runnable {
} }
public StoragePolicySatisfier(final Namesystem namesystem, public StoragePolicySatisfier(final Namesystem namesystem,
final BlockStorageMovementNeeded storageMovementNeeded,
final BlockManager blkManager, Configuration conf) { final BlockManager blkManager, Configuration conf) {
this.namesystem = namesystem; this.namesystem = namesystem;
this.storageMovementNeeded = storageMovementNeeded; this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
this);
this.blockManager = blkManager; this.blockManager = blkManager;
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
conf.getLong( conf.getLong(
@ -146,7 +143,7 @@ public class StoragePolicySatisfier implements Runnable {
// Ensure that all the previously submitted block movements(if any) have to // Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes. // be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs(); addDropSPSWorkCommandsToAllDNs();
storageMovementNeeded.start();
storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start(); storagePolicySatisfierThread.start();
@ -162,14 +159,17 @@ public class StoragePolicySatisfier implements Runnable {
*/ */
public synchronized void disable(boolean forceStop) { public synchronized void disable(boolean forceStop) {
isRunning = false; isRunning = false;
if (storagePolicySatisfierThread == null) { if (storagePolicySatisfierThread == null) {
return; return;
} }
storageMovementNeeded.stop();
storagePolicySatisfierThread.interrupt(); storagePolicySatisfierThread.interrupt();
this.storageMovementsMonitor.stop(); this.storageMovementsMonitor.stop();
if (forceStop) { if (forceStop) {
this.clearQueuesWithNotification(); storageMovementNeeded.clearQueuesWithNotification();
addDropSPSWorkCommandsToAllDNs(); addDropSPSWorkCommandsToAllDNs();
} else { } else {
LOG.info("Stopping StoragePolicySatisfier."); LOG.info("Stopping StoragePolicySatisfier.");
@ -184,6 +184,7 @@ public class StoragePolicySatisfier implements Runnable {
disable(true); disable(true);
} }
this.storageMovementsMonitor.stopGracefully(); this.storageMovementsMonitor.stopGracefully();
if (storagePolicySatisfierThread == null) { if (storagePolicySatisfierThread == null) {
return; return;
} }
@ -220,10 +221,11 @@ public class StoragePolicySatisfier implements Runnable {
while (namesystem.isRunning() && isRunning) { while (namesystem.isRunning() && isRunning) {
try { try {
if (!namesystem.isInSafeMode()) { if (!namesystem.isInSafeMode()) {
Long blockCollectionID = storageMovementNeeded.get(); ItemInfo itemInfo = storageMovementNeeded.get();
if (blockCollectionID != null) { if (itemInfo != null) {
long trackId = itemInfo.getTrackId();
BlockCollection blockCollection = BlockCollection blockCollection =
namesystem.getBlockCollection(blockCollectionID); namesystem.getBlockCollection(trackId);
// Check blockCollectionId existence. // Check blockCollectionId existence.
if (blockCollection != null) { if (blockCollection != null) {
BlocksMovingAnalysisStatus status = BlocksMovingAnalysisStatus status =
@ -234,21 +236,21 @@ public class StoragePolicySatisfier implements Runnable {
// Just add to monitor, so it will be tracked for result and // Just add to monitor, so it will be tracked for result and
// be removed on successful storage movement result. // be removed on successful storage movement result.
case ALL_BLOCKS_TARGETS_PAIRED: case ALL_BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(blockCollectionID, true); this.storageMovementsMonitor.add(itemInfo, true);
break; break;
// Add to monitor with allBlcoksAttemptedToSatisfy flag false, so // Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
// that it will be tracked and still it will be consider for retry // that it will be tracked and still it will be consider for retry
// as analysis was not found targets for storage movement blocks. // as analysis was not found targets for storage movement blocks.
case FEW_BLOCKS_TARGETS_PAIRED: case FEW_BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(blockCollectionID, false); this.storageMovementsMonitor.add(itemInfo, false);
break; break;
case FEW_LOW_REDUNDANCY_BLOCKS: case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Adding trackID " + blockCollectionID LOG.debug("Adding trackID " + trackId
+ " back to retry queue as some of the blocks" + " back to retry queue as some of the blocks"
+ " are low redundant."); + " are low redundant.");
} }
this.storageMovementNeeded.add(blockCollectionID); this.storageMovementNeeded.add(itemInfo);
break; break;
// Just clean Xattrs // Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED: case BLOCKS_TARGET_PAIRING_SKIPPED:
@ -256,9 +258,13 @@ public class StoragePolicySatisfier implements Runnable {
default: default:
LOG.info("Block analysis skipped or blocks already satisfied" LOG.info("Block analysis skipped or blocks already satisfied"
+ " with storages. So, Cleaning up the Xattrs."); + " with storages. So, Cleaning up the Xattrs.");
postBlkStorageMovementCleanup(blockCollectionID); storageMovementNeeded.removeItemTrackInfo(itemInfo);
break; break;
} }
} else {
// File doesn't exists (maybe got deleted), remove trackId from
// the queue
storageMovementNeeded.removeItemTrackInfo(itemInfo);
} }
} }
} }
@ -828,31 +834,63 @@ public class StoragePolicySatisfier implements Runnable {
} }
/** /**
* Clean all the movements in storageMovementNeeded and notify * Set file inode in queue for which storage movement needed for its blocks.
* to clean up required resources. *
* @throws IOException * @param inodeId
* - file inode/blockcollection id.
*/ */
private void clearQueuesWithNotification() { public void satisfyStoragePolicy(Long inodeId) {
Long id; //For file rootId and trackId is same
while ((id = storageMovementNeeded.get()) != null) { storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
try { if (LOG.isDebugEnabled()) {
postBlkStorageMovementCleanup(id); LOG.debug("Added track info for inode {} to block "
} catch (IOException ie) { + "storageMovementNeeded queue", inodeId);
LOG.warn("Failed to remove SPS "
+ "xattr for collection id " + id, ie);
}
} }
} }
public void addInodeToPendingDirQueue(long id) {
storageMovementNeeded.addToPendingDirQueue(id);
}
/** /**
* When block movement has been finished successfully, some additional * Clear queues for given track id.
* operations should be notified, for example, SPS xattr should be
* removed.
* @param trackId track id i.e., block collection id.
* @throws IOException
*/ */
public void postBlkStorageMovementCleanup(long trackId) public void clearQueue(long trackId) {
throws IOException { storageMovementNeeded.clearQueue(trackId);
this.namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY); }
/**
* ItemInfo is a file info object for which need to satisfy the
* policy.
*/
public static class ItemInfo {
private long rootId;
private long trackId;
public ItemInfo(long rootId, long trackId) {
this.rootId = rootId;
this.trackId = trackId;
}
/**
* Return the root of the current track Id.
*/
public long getRootId() {
return rootId;
}
/**
* Return the File inode Id for which needs to satisfy the policy.
*/
public long getTrackId() {
return trackId;
}
/**
* Returns true if the tracking path is a directory, false otherwise.
*/
public boolean isDir() {
return (rootId != trackId);
}
} }
} }

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@ -115,9 +114,7 @@ public class TestStoragePolicySatisfyWorker {
// move to ARCHIVE // move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD"); dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = cluster.getNamesystem(); dfs.satisfyStoragePolicy(new Path(file));
INode inode = namesystem.getFSDirectory().getINode(file);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -38,7 +39,9 @@ public class TestBlockStorageMovementAttemptedItems {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(); unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
Mockito.mock(Namesystem.class),
Mockito.mock(StoragePolicySatisfier.class));
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
selfRetryTimeout, unsatisfiedStorageMovementFiles, sps); selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
@ -57,9 +60,9 @@ public class TestBlockStorageMovementAttemptedItems {
long stopTime = monotonicNow() + (retryTimeout * 2); long stopTime = monotonicNow() + (retryTimeout * 2);
boolean isItemFound = false; boolean isItemFound = false;
while (monotonicNow() < (stopTime)) { while (monotonicNow() < (stopTime)) {
Long ele = null; ItemInfo ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
if (item.longValue() == ele.longValue()) { if (item == ele.getTrackId()) {
isItemFound = true; isItemFound = true;
break; break;
} }
@ -77,7 +80,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void testAddResultWithFailureResult() throws Exception { public void testAddResultWithFailureResult() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item, true); bsmAttemptedItems.add(new ItemInfo(0L, item), true);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
@ -88,7 +91,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void testAddResultWithSucessResult() throws Exception { public void testAddResultWithSucessResult() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item, true); bsmAttemptedItems.add(new ItemInfo(0L, item), true);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@ -99,7 +102,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void testNoResultAdded() throws Exception { public void testNoResultAdded() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item, true); bsmAttemptedItems.add(new ItemInfo(0L, item), true);
// After self retry timeout, it should be added back for retry // After self retry timeout, it should be added back for retry
assertTrue("Failed to add to the retry list", assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 600)); checkItemMovedForRetry(item, 600));
@ -115,7 +118,7 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried1() throws Exception { public void testPartialBlockMovementShouldBeRetried1() throws Exception {
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item, false); bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@ -136,7 +139,7 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried2() throws Exception { public void testPartialBlockMovementShouldBeRetried2() throws Exception {
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item, false); bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@ -153,17 +156,20 @@ public class TestBlockStorageMovementAttemptedItems {
} }
/** /**
* Partial block movement with only BlocksStorageMovementResult#FAILURE result * Partial block movement with only BlocksStorageMovementResult#FAILURE
* and storageMovementAttemptedItems list is empty. * result and storageMovementAttemptedItems list is empty.
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried3() throws Exception { public void testPartialBlockMovementWithEmptyAttemptedQueue()
throws Exception {
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); item, BlocksStorageMovementResult.Status.FAILURE)});
bsmAttemptedItems.blockStorageMovementResultCheck(); bsmAttemptedItems.blockStorageMovementResultCheck();
assertTrue("Failed to add to the retry list", assertFalse(
"Should not add in queue again if it is not there in"
+ " storageMovementAttemptedItems",
checkItemMovedForRetry(item, 5000)); checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0, assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount()); bsmAttemptedItems.getAttemptedItemsCount());
@ -176,7 +182,7 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried4() throws Exception { public void testPartialBlockMovementShouldBeRetried4() throws Exception {
Long item = new Long(1234); Long item = new Long(1234);
bsmAttemptedItems.add(item, false); bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults( bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});

View File

@ -20,16 +20,22 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -481,6 +487,104 @@ public class TestPersistentStoragePolicySatisfier {
} }
} }
/**
* Test SPS xAttr on directory. xAttr should be removed from the directory
* once all the files blocks moved to specific storage.
*/
@Test(timeout = 300000)
public void testSPSxAttrWhenSpsCalledForDir() throws Exception {
try {
clusterSetUp();
Path parent = new Path("/parent");
// create parent dir
fs.mkdirs(parent);
// create 10 child files
for (int i = 0; i < 5; i++) {
DFSTestUtil.createFile(fs, new Path(parent, "f" + i), 1024, (short) 3,
0);
}
// Set storage policy for parent directory
fs.setStoragePolicy(parent, "COLD");
// Stop one DN so we can check the SPS xAttr for directory.
DataNodeProperties stopDataNode = cluster.stopDataNode(0);
fs.satisfyStoragePolicy(parent);
// Check xAttr for parent directory
FSNamesystem namesystem = cluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode("/parent");
XAttrFeature f = inode.getXAttrFeature();
assertTrue("SPS xAttr should be exist",
f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
// check for the child, SPS xAttr should not be there
for (int i = 0; i < 5; i++) {
inode = namesystem.getFSDirectory().getINode("/parent/f" + i);
f = inode.getXAttrFeature();
assertTrue(f == null);
}
cluster.restartDataNode(stopDataNode, false);
// wait and check all the file block moved in ARCHIVE
for (int i = 0; i < 5; i++) {
DFSTestUtil.waitExpectedStorageType("/parent/f" + i,
StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
}
DFSTestUtil.waitForXattrRemoved("/parent", XATTR_SATISFY_STORAGE_POLICY,
namesystem, 10000);
} finally {
clusterShutdown();
}
}
/**
* Test SPS xAttr on file. xAttr should be removed from the file
* once all the blocks moved to specific storage.
*/
@Test(timeout = 300000)
public void testSPSxAttrWhenSpsCalledForFile() throws Exception {
try {
clusterSetUp();
Path file = new Path("/file");
DFSTestUtil.createFile(fs, file, 1024, (short) 3, 0);
// Set storage policy for file
fs.setStoragePolicy(file, "COLD");
// Stop one DN so we can check the SPS xAttr for file.
DataNodeProperties stopDataNode = cluster.stopDataNode(0);
fs.satisfyStoragePolicy(file);
// Check xAttr for parent directory
FSNamesystem namesystem = cluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode("/file");
XAttrFeature f = inode.getXAttrFeature();
assertTrue("SPS xAttr should be exist",
f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
cluster.restartDataNode(stopDataNode, false);
// wait and check all the file block moved in ARCHIVE
DFSTestUtil.waitExpectedStorageType("/file", StorageType.ARCHIVE, 3,
30000, cluster.getFileSystem());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
return !existingXAttrs.contains(XATTR_SATISFY_STORAGE_POLICY);
}
}, 100, 10000);
} finally {
clusterShutdown();
}
}
/** /**
* Restart the hole env and trigger the DataNode's heart beats. * Restart the hole env and trigger the DataNode's heart beats.
* @throws Exception * @throws Exception

View File

@ -119,8 +119,6 @@ public class TestStoragePolicySatisfier {
private void doTestWhenStoragePolicySetToCOLD() throws Exception { private void doTestWhenStoragePolicySetToCOLD() throws Exception {
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD); dfs.setStoragePolicy(new Path(file), COLD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes = StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
@ -129,7 +127,7 @@ public class TestStoragePolicySatisfier {
startAdditionalDNs(config, 3, numOfDatanodes, newtypes, startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details // Wait till namenode notified about the block location details
@ -144,8 +142,6 @@ public class TestStoragePolicySatisfier {
createCluster(); createCluster();
// Change policy to ALL_SSD // Change policy to ALL_SSD
dfs.setStoragePolicy(new Path(file), "ALL_SSD"); dfs.setStoragePolicy(new Path(file), "ALL_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes = StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}, new StorageType[][]{{StorageType.SSD, StorageType.DISK},
@ -156,7 +152,7 @@ public class TestStoragePolicySatisfier {
// datanodes. // datanodes.
startAdditionalDNs(config, 3, numOfDatanodes, newtypes, startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD // Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas // areas
@ -174,8 +170,6 @@ public class TestStoragePolicySatisfier {
createCluster(); createCluster();
// Change policy to ONE_SSD // Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD); dfs.setStoragePolicy(new Path(file), ONE_SSD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes = StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
@ -184,7 +178,7 @@ public class TestStoragePolicySatisfier {
// datanodes. // datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes, startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD // Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas // areas
@ -207,8 +201,6 @@ public class TestStoragePolicySatisfier {
createCluster(); createCluster();
// Change policy to ONE_SSD // Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD); dfs.setStoragePolicy(new Path(file), ONE_SSD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes = StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
@ -217,7 +209,7 @@ public class TestStoragePolicySatisfier {
// datanodes. // datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes, startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till the block is moved to SSD areas // Wait till the block is moved to SSD areas
@ -250,13 +242,10 @@ public class TestStoragePolicySatisfier {
files.add(file1); files.add(file1);
writeContent(file1); writeContent(file1);
} }
FSNamesystem namesystem = hdfsCluster.getNamesystem();
List<Long> blockCollectionIds = new ArrayList<>();
// Change policy to ONE_SSD // Change policy to ONE_SSD
for (String fileName : files) { for (String fileName : files) {
dfs.setStoragePolicy(new Path(fileName), ONE_SSD); dfs.setStoragePolicy(new Path(fileName), ONE_SSD);
INode inode = namesystem.getFSDirectory().getINode(fileName); dfs.satisfyStoragePolicy(new Path(fileName));
blockCollectionIds.add(inode.getId());
} }
StorageType[][] newtypes = StorageType[][] newtypes =
@ -266,9 +255,6 @@ public class TestStoragePolicySatisfier {
// datanodes. // datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes, startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
for (long inodeId : blockCollectionIds) {
namesystem.getBlockManager().satisfyStoragePolicy(inodeId);
}
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
for (String fileName : files) { for (String fileName : files) {
@ -279,7 +265,7 @@ public class TestStoragePolicySatisfier {
fileName, StorageType.DISK, 2, 30000, dfs); fileName, StorageType.DISK, 2, 30000, dfs);
} }
waitForBlocksMovementResult(blockCollectionIds.size(), 30000); waitForBlocksMovementResult(files.size(), 30000);
} finally { } finally {
shutdownCluster(); shutdownCluster();
} }
@ -441,8 +427,6 @@ public class TestStoragePolicySatisfier {
createCluster(); createCluster();
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD); dfs.setStoragePolicy(new Path(file), COLD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes = StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}}; new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
@ -451,7 +435,7 @@ public class TestStoragePolicySatisfier {
startAdditionalDNs(config, 1, numOfDatanodes, newtypes, startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to // Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area. // ARCHIVE area.
@ -486,8 +470,6 @@ public class TestStoragePolicySatisfier {
createCluster(); createCluster();
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD); dfs.setStoragePolicy(new Path(file), COLD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes = StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK}}; new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
@ -495,7 +477,7 @@ public class TestStoragePolicySatisfier {
startAdditionalDNs(config, 1, numOfDatanodes, newtypes, startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// No block movement will be scheduled as there is no target node // No block movement will be scheduled as there is no target node
@ -600,47 +582,51 @@ public class TestStoragePolicySatisfier {
*/ */
@Test(timeout = 120000) @Test(timeout = 120000)
public void testMoveWithBlockPinning() throws Exception { public void testMoveWithBlockPinning() throws Exception {
config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true); try{
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
true); config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3) true);
.storageTypes( hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
new StorageType[][] {{StorageType.DISK, StorageType.DISK}, .storageTypes(
{StorageType.DISK, StorageType.DISK}, new StorageType[][] {{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}}) {StorageType.DISK, StorageType.DISK},
.build(); {StorageType.DISK, StorageType.DISK}})
.build();
hdfsCluster.waitActive(); hdfsCluster.waitActive();
dfs = hdfsCluster.getFileSystem(); dfs = hdfsCluster.getFileSystem();
// create a file with replication factor 3 and mark 2 pinned block // create a file with replication factor 3 and mark 2 pinned block
// locations. // locations.
final String file1 = createFileAndSimulateFavoredNodes(2); final String file1 = createFileAndSimulateFavoredNodes(2);
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file1), COLD); dfs.setStoragePolicy(new Path(file1), COLD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file1);
StorageType[][] newtypes = StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}, {StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}}; {StorageType.ARCHIVE, StorageType.ARCHIVE}};
// Adding DISK based datanodes // Adding DISK based datanodes
startAdditionalDNs(config, 3, numOfDatanodes, newtypes, startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster); storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file1));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// No block movement will be scheduled as there is no target node available // No block movement will be scheduled as there is no target node
// with the required storage type. // available with the required storage type.
waitForAttemptedItems(1, 30000); waitForAttemptedItems(1, 30000);
waitForBlocksMovementResult(1, 30000); waitForBlocksMovementResult(1, 30000);
DFSTestUtil.waitExpectedStorageType( DFSTestUtil.waitExpectedStorageType(
file1, StorageType.ARCHIVE, 1, 30000, dfs); file1, StorageType.ARCHIVE, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType( DFSTestUtil.waitExpectedStorageType(
file1, StorageType.DISK, 2, 30000, dfs); file1, StorageType.DISK, 2, 30000, dfs);
} finally {
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
}
} }
/** /**
@ -682,10 +668,8 @@ public class TestStoragePolicySatisfier {
// Change policy to COLD // Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD); dfs.setStoragePolicy(new Path(file), COLD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to // Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area. // ARCHIVE area.
@ -723,10 +707,8 @@ public class TestStoragePolicySatisfier {
// Change policy to ONE_SSD // Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD); dfs.setStoragePolicy(new Path(file), ONE_SSD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType( DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs); file, StorageType.SSD, 1, 30000, dfs);
@ -764,10 +746,7 @@ public class TestStoragePolicySatisfier {
// Change policy to WARM // Change policy to WARM
dfs.setStoragePolicy(new Path(file), "WARM"); dfs.setStoragePolicy(new Path(file), "WARM");
FSNamesystem namesystem = hdfsCluster.getNamesystem(); dfs.satisfyStoragePolicy(new Path(file));
INode inode = namesystem.getFSDirectory().getINode(file);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats(); hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType( DFSTestUtil.waitExpectedStorageType(
@ -848,8 +827,6 @@ public class TestStoragePolicySatisfier {
// Change policy to ONE_SSD // Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD); dfs.setStoragePolicy(new Path(file), ONE_SSD);
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
Path filePath = new Path("/testChooseInSameDatanode"); Path filePath = new Path("/testChooseInSameDatanode");
final FSDataOutputStream out = final FSDataOutputStream out =
dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE); dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
@ -872,7 +849,7 @@ public class TestStoragePolicySatisfier {
for (DataNode dataNode : dataNodes) { for (DataNode dataNode : dataNodes) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true); DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
} }
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); dfs.satisfyStoragePolicy(new Path(file));
// Wait for items to be processed // Wait for items to be processed
waitForAttemptedItems(1, 30000); waitForAttemptedItems(1, 30000);