HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Uma Maheswara Rao G 2017-09-30 06:31:52 -07:00 committed by Uma Maheswara Rao Gangumalla
parent 7ea24fc06c
commit bfd3f8bd8a
11 changed files with 689 additions and 83 deletions

View File

@ -618,6 +618,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.storage.policy.satisfier.enabled";
public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT =
false;
public static final String DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY =
"dfs.storage.policy.satisfier.queue.limit";
public static final int DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT =
1000;
public static final String DFS_SPS_WORK_MULTIPLIER_PER_ITERATION =
"dfs.storage.policy.satisfier.work.multiplier.per.iteration";
public static final int DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT =
1;
public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.recheck.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =

View File

@ -1457,7 +1457,27 @@ public static int getReplWorkMultiplier(Configuration conf) {
"It should be a positive, non-zero integer value.");
return blocksReplWorkMultiplier;
}
/**
* Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
* configuration.
*
* @param conf Configuration
* @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
*/
public static int getSPSWorkMultiplier(Configuration conf) {
int spsWorkMultiplier = conf
.getInt(
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
Preconditions.checkArgument(
(spsWorkMultiplier > 0),
DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
" = '" + spsWorkMultiplier + "' is invalid. " +
"It should be a positive, non-zero integer value.");
return spsWorkMultiplier;
}
/**
* Get SPNEGO keytab Key from configuration
*

View File

@ -101,7 +101,7 @@ public BlockStorageMovementAttemptedItems(long recheckTimeout,
public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
synchronized (storageMovementAttemptedItems) {
AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
allBlockLocsAttemptedToSatisfy);
storageMovementAttemptedItems.put(itemInfo.getTrackId(),
attemptedItemInfo);
@ -260,7 +260,7 @@ void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedResults) {
if (!isExistInResult(blockCollectionID)) {
ItemInfo candidate = new ItemInfo(
itemInfo.getRootId(), blockCollectionID);
itemInfo.getStartId(), blockCollectionID);
blockStorageMovementNeeded.add(candidate);
iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
@ -315,7 +315,7 @@ void blockStorageMovementResultCheck() throws IOException {
// blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
// the xAttr
ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
? attemptedItemInfo.getRootId() : trackId, trackId);
? attemptedItemInfo.getStartId() : trackId, trackId);
switch (status) {
case FAILURE:
if (attemptedItemInfo != null) {
@ -345,7 +345,7 @@ void blockStorageMovementResultCheck() throws IOException {
if (attemptedItemInfo != null) {
if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
blockStorageMovementNeeded
.add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
.add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
LOG.warn("{} But adding trackID back to retry queue as some of"
+ " the blocks couldn't find matching target nodes in"
+ " previous SPS iteration.", msg);

View File

@ -29,12 +29,15 @@
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* A Class to track the block collection IDs (Inode's ID) for which physical
* storage movement needed as per the Namespace and StorageReports from DN.
@ -53,11 +56,11 @@ public class BlockStorageMovementNeeded {
new LinkedList<ItemInfo>();
/**
* Map of rootId and number of child's. Number of child's indicate the number
* of files pending to satisfy the policy.
* Map of startId and number of child's. Number of child's indicate the
* number of files pending to satisfy the policy.
*/
private final Map<Long, Integer> pendingWorkForDirectory =
new HashMap<Long, Integer>();
private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<Long, DirPendingWorkInfo>();
private final Namesystem namesystem;
@ -66,12 +69,15 @@ public class BlockStorageMovementNeeded {
private final StoragePolicySatisfier sps;
private Daemon fileInodeIdCollector;
private Daemon inodeIdCollector;
private final int maxQueuedItem;
public BlockStorageMovementNeeded(Namesystem namesystem,
StoragePolicySatisfier sps) {
StoragePolicySatisfier sps, int queueLimit) {
this.namesystem = namesystem;
this.sps = sps;
this.maxQueuedItem = queueLimit;
}
/**
@ -88,15 +94,24 @@ public synchronized void add(ItemInfo trackInfo) {
/**
* Add the itemInfo to tracking list for which storage movement
* expected if necessary.
* @param rootId
* - root inode id
* @param startId
* - start id
* @param itemInfoList
* - List of child in the directory
*/
private synchronized void addAll(Long rootId,
List<ItemInfo> itemInfoList) {
@VisibleForTesting
public synchronized void addAll(long startId,
List<ItemInfo> itemInfoList, boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
pendingWorkForDirectory.put(rootId, itemInfoList.size());
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork == null) {
pendingWork = new DirPendingWorkInfo();
pendingWorkForDirectory.put(startId, pendingWork);
}
pendingWork.addPendingWorkCount(itemInfoList.size());
if (scanCompleted) {
pendingWork.markScanCompleted();
}
}
/**
@ -118,6 +133,25 @@ public synchronized void addToPendingDirQueue(long id) {
}
}
/**
* Returns queue remaining capacity.
*/
public synchronized int remainingCapacity() {
int size = storageMovementNeeded.size();
if (size >= maxQueuedItem) {
return 0;
} else {
return (maxQueuedItem - size);
}
}
/**
* Returns queue size.
*/
public synchronized int size() {
return storageMovementNeeded.size();
}
public synchronized void clearAll() {
spsDirsToBeTraveresed.clear();
storageMovementNeeded.clear();
@ -131,20 +165,20 @@ public synchronized void clearAll() {
public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
throws IOException {
if (trackInfo.isDir()) {
// If track is part of some root then reduce the pending directory work
// count.
long rootId = trackInfo.getRootId();
INode inode = namesystem.getFSDirectory().getInode(rootId);
// If track is part of some start inode then reduce the pending
// directory work count.
long startId = trackInfo.getStartId();
INode inode = namesystem.getFSDirectory().getInode(startId);
if (inode == null) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(rootId);
this.pendingWorkForDirectory.remove(startId);
} else {
if (pendingWorkForDirectory.get(rootId) != null) {
Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
pendingWorkForDirectory.put(rootId, pendingWork);
if (pendingWork <= 0) {
namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
pendingWorkForDirectory.remove(rootId);
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork != null) {
pendingWork.decrementPendingWorkCount();
if (pendingWork.isDirWorkDone()) {
namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
pendingWorkForDirectory.remove(startId);
}
}
}
@ -161,7 +195,7 @@ public synchronized void clearQueue(long trackId) {
Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
ItemInfo next = iterator.next();
if (next.getRootId() == trackId) {
if (next.getStartId() == trackId) {
iterator.remove();
}
}
@ -208,7 +242,17 @@ public synchronized void clearQueuesWithNotification() {
* Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
* ID's to process for satisfy the policy.
*/
private class FileInodeIdCollector implements Runnable {
private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
implements Runnable {
private int remainingCapacity = 0;
private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
StorageMovementPendingInodeIdCollector(FSDirectory dir) {
super(dir);
}
@Override
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
@ -216,38 +260,36 @@ public void run() {
try {
if (!namesystem.isInSafeMode()) {
FSDirectory fsd = namesystem.getFSDirectory();
Long rootINodeId = spsDirsToBeTraveresed.poll();
if (rootINodeId == null) {
Long startINodeId = spsDirsToBeTraveresed.poll();
if (startINodeId == null) {
// Waiting for SPS path
synchronized (spsDirsToBeTraveresed) {
spsDirsToBeTraveresed.wait(5000);
}
} else {
INode rootInode = fsd.getInode(rootINodeId);
if (rootInode != null) {
// TODO : HDFS-12291
// 1. Implement an efficient recursive directory iteration
// mechanism and satisfies storage policy for all the files
// under the given directory.
// 2. Process files in batches,so datanodes workload can be
// handled.
List<ItemInfo> itemInfoList =
new ArrayList<>();
for (INode childInode : rootInode.asDirectory()
.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
if (childInode.isFile()
&& childInode.asFile().numBlocks() != 0) {
itemInfoList.add(
new ItemInfo(rootINodeId, childInode.getId()));
}
INode startInode = fsd.getInode(startINodeId);
if (startInode != null) {
try {
remainingCapacity = remainingCapacity();
readLock();
traverseDir(startInode.asDirectory(), startINodeId,
HdfsFileStatus.EMPTY_NAME,
new SPSTraverseInfo(startINodeId));
} finally {
readUnlock();
}
if (itemInfoList.isEmpty()) {
// satisfy track info is empty, so remove the xAttr from the
// directory
namesystem.removeXattr(rootINodeId,
// Mark startInode traverse is done
addAll(startInode.getId(), currentBatch, true);
currentBatch.clear();
// check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo =
pendingWorkForDirectory.get(startInode.getId());
if (dirPendingWorkInfo.isDirWorkDone()) {
namesystem.removeXattr(startInode.getId(),
XATTR_SATISFY_STORAGE_POLICY);
pendingWorkForDirectory.remove(startInode.getId());
}
addAll(rootINodeId, itemInfoList);
}
}
}
@ -256,17 +298,140 @@ public void run() {
}
}
}
@Override
protected void checkPauseForTesting() throws InterruptedException {
// TODO implement if needed
}
@Override
protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
throws IOException, InterruptedException {
assert getFSDirectory().hasReadLock();
if (LOG.isTraceEnabled()) {
LOG.trace("Processing {} for statisy the policy",
inode.getFullPathName());
}
if (!inode.isFile()) {
return false;
}
if (inode.isFile() && inode.asFile().numBlocks() != 0) {
currentBatch.add(new ItemInfo(
((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
remainingCapacity--;
}
return true;
}
@Override
protected boolean canSubmitCurrentBatch() {
return remainingCapacity <= 0;
}
@Override
protected void checkINodeReady(long startId) throws IOException {
FSNamesystem fsn = ((FSNamesystem) namesystem);
fsn.checkNameNodeSafeMode("NN is in safe mode,"
+ "cannot satisfy the policy.");
// SPS work should be cancelled when NN goes to standby. Just
// double checking for sanity.
fsn.checkOperation(NameNode.OperationCategory.WRITE);
}
@Override
protected void submitCurrentBatch(long startId)
throws IOException, InterruptedException {
// Add current child's to queue
addAll(startId, currentBatch, false);
currentBatch.clear();
}
@Override
protected void throttle() throws InterruptedException {
assert !getFSDirectory().hasReadLock();
assert !namesystem.hasReadLock();
if (LOG.isDebugEnabled()) {
LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+ " waiting for some free slots.");
}
remainingCapacity = remainingCapacity();
// wait for queue to be free
while (remainingCapacity <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for storageMovementNeeded queue to be free!");
}
Thread.sleep(5000);
remainingCapacity = remainingCapacity();
}
}
@Override
protected boolean canTraverseDir(INode inode) throws IOException {
return true;
}
}
public void start() {
fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
fileInodeIdCollector.setName("FileInodeIdCollector");
fileInodeIdCollector.start();
/**
* Info for directory recursive scan.
*/
public static class DirPendingWorkInfo {
private int pendingWorkCount = 0;
private boolean fullyScanned = false;
/**
* Increment the pending work count for directory.
*/
public synchronized void addPendingWorkCount(int count) {
this.pendingWorkCount = this.pendingWorkCount + count;
}
/**
* Decrement the pending work count for directory one track info is
* completed.
*/
public synchronized void decrementPendingWorkCount() {
this.pendingWorkCount--;
}
/**
* Return true if all the pending work is done and directory fully
* scanned, otherwise false.
*/
public synchronized boolean isDirWorkDone() {
return (pendingWorkCount <= 0 && fullyScanned);
}
/**
* Mark directory scan is completed.
*/
public synchronized void markScanCompleted() {
this.fullyScanned = true;
}
}
public void stop() {
if (fileInodeIdCollector != null) {
fileInodeIdCollector.interrupt();
public void init() {
inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
namesystem.getFSDirectory()));
inodeIdCollector.setName("FileInodeIdCollector");
inodeIdCollector.start();
}
public void close() {
if (inodeIdCollector != null) {
inodeIdCollector.interrupt();
}
}
class SPSTraverseInfo extends TraverseInfo {
private long startId;
SPSTraverseInfo(long startId) {
this.startId = startId;
}
public long getStartId() {
return startId;
}
}
}

View File

@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;

View File

@ -77,7 +77,8 @@ public class StoragePolicySatisfier implements Runnable {
private final BlockStorageMovementNeeded storageMovementNeeded;
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
private int spsWorkMultiplier;
private long blockCount = 0L;
/**
* Represents the collective analysis status for all blocks.
*/
@ -106,7 +107,9 @@ public StoragePolicySatisfier(final Namesystem namesystem,
final BlockManager blkManager, Configuration conf) {
this.namesystem = namesystem;
this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
this);
this, conf.getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
this.blockManager = blkManager;
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
conf.getLong(
@ -117,6 +120,7 @@ public StoragePolicySatisfier(final Namesystem namesystem,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
storageMovementNeeded,
this);
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
}
/**
@ -143,7 +147,7 @@ public synchronized void start(boolean reconfigStart) {
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
storageMovementNeeded.start();
storageMovementNeeded.init();
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@ -164,7 +168,7 @@ public synchronized void disable(boolean forceStop) {
return;
}
storageMovementNeeded.stop();
storageMovementNeeded.close();
storagePolicySatisfierThread.interrupt();
this.storageMovementsMonitor.stop();
@ -268,9 +272,13 @@ public void run() {
}
}
}
// TODO: We can think to make this as configurable later, how frequently
// we want to check block movements.
Thread.sleep(3000);
int numLiveDn = namesystem.getFSDirectory().getBlockManager()
.getDatanodeManager().getNumLiveDataNodes();
if (storageMovementNeeded.size() == 0
|| blockCount > (numLiveDn * spsWorkMultiplier)) {
Thread.sleep(3000);
blockCount = 0L;
}
} catch (Throwable t) {
handleException(t);
}
@ -380,6 +388,11 @@ private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
blockMovingInfos, coordinatorNode);
int count = 0;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
count = count + blkMovingInfo.getSources().length;
}
blockCount = blockCount + count;
return status;
}
@ -840,7 +853,7 @@ public void clearQueues() {
* - file inode/blockcollection id.
*/
public void satisfyStoragePolicy(Long inodeId) {
//For file rootId and trackId is same
//For file startId and trackId is same
storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
@ -864,19 +877,19 @@ public void clearQueue(long trackId) {
* policy.
*/
public static class ItemInfo {
private long rootId;
private long startId;
private long trackId;
public ItemInfo(long rootId, long trackId) {
this.rootId = rootId;
public ItemInfo(long startId, long trackId) {
this.startId = startId;
this.trackId = trackId;
}
/**
* Return the root of the current track Id.
* Return the start inode id of the current track Id.
*/
public long getRootId() {
return rootId;
public long getStartId() {
return startId;
}
/**
@ -890,7 +903,7 @@ public long getTrackId() {
* Returns true if the tracking path is a directory, false otherwise.
*/
public boolean isDir() {
return (rootId != trackId);
return (startId != trackId);
}
}
}

View File

@ -4509,6 +4509,29 @@
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.queue.limit</name>
<value>1000</value>
<description>
Storage policy satisfier queue size. This queue contains the currently
scheduled file's inode ID for statisfy the policy.
Default value is 1000.
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.work.multiplier.per.iteration</name>
<value>1</value>
<description>
*Note*: Advanced property. Change with caution.
This determines the total amount of block transfers to begin in
one iteration, for satisfy the policy. The actual number is obtained by
multiplying this multiplier with the total number of live nodes in the
cluster. The result number is the number of blocks to begin transfers
immediately. This number can be any positive, non-zero integer.
</description>
</property>
<property>
<name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
<value>300000</value>

View File

@ -112,7 +112,7 @@ SPS can be enabled and disabled dynamically without restarting the Namenode.
Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will consider the files which are immediate to that directory. Sub-directories won't be considered for satisfying the policy. Its user responsibility to call this API on directories recursively, to track all files under the sub tree.
* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will scan all sub-directories and consider all the files for satisfy the policy..
* HdfsAdmin API :
`public void satisfyStoragePolicy(final Path path) throws IOException`
@ -214,7 +214,6 @@ Get the storage policy of a file or a directory.
### Satisfy Storage Policy
Schedule blocks to move based on file's/directory's current storage policy.
Note: For directory case, it will consider immediate files under that directory and it will not consider sub directories recursively.
* Command:

View File

@ -41,7 +41,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void setup() throws Exception {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
Mockito.mock(Namesystem.class),
Mockito.mock(StoragePolicySatisfier.class));
Mockito.mock(StoragePolicySatisfier.class), 100);
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);

View File

@ -191,7 +191,7 @@ public void testWithCheckpoint() throws Exception {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.DEFAULT, 3, timeout, fs);
childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
@ -232,7 +232,9 @@ public void testWithHA() throws Exception {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.DEFAULT, 3, timeout, fs);
childFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.ARCHIVE, 2, timeout, fs);
} finally {
clusterShutdown();
}
@ -269,7 +271,7 @@ public void testWithRestarts() throws Exception {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.DEFAULT, 3, timeout, fs);
childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
}

View File

@ -21,6 +21,9 @@
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -61,8 +64,10 @@
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import com.google.common.base.Supplier;
@ -71,6 +76,12 @@
* moved and finding its suggested target locations to move.
*/
public class TestStoragePolicySatisfier {
{
GenericTestUtils.setLogLevel(
getLogger(FSTreeTraverser.class), Level.DEBUG);
}
private static final String ONE_SSD = "ONE_SSD";
private static final String COLD = "COLD";
private static final Logger LOG =
@ -341,7 +352,9 @@ public void testSatisfyDirWithHdfsAdmin() throws Exception {
// take no effect for the sub-dir's file in the directory.
DFSTestUtil.waitExpectedStorageType(
subFile2, StorageType.DEFAULT, 3, 30000, dfs);
subFile2, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
subFile2, StorageType.DISK, 2, 30000, dfs);
} finally {
shutdownCluster();
}
@ -1083,6 +1096,368 @@ public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception {
}
}
/**
* Test SPS for empty directory, xAttr should be removed.
*/
@Test(timeout = 300000)
public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
InterruptedException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
Path emptyDir = new Path("/emptyDir");
fs.mkdirs(emptyDir);
fs.satisfyStoragePolicy(emptyDir);
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved("/emptyDir",
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test SPS for not exist directory.
*/
@Test(timeout = 300000)
public void testSPSForNonExistDirectory() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
Path emptyDir = new Path("/emptyDir");
try {
fs.satisfyStoragePolicy(emptyDir);
fail("FileNotFoundException should throw");
} catch (FileNotFoundException e) {
// nothing to do
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test SPS for directory tree which doesn't have files.
*/
@Test(timeout = 300000)
public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
// Create directories
/*
* root
* |
* A--------C--------D
* |
* G----H----I
* |
* O
*/
DistributedFileSystem fs = cluster.getFileSystem();
fs.mkdirs(new Path("/root/C/H/O"));
fs.mkdirs(new Path("/root/A"));
fs.mkdirs(new Path("/root/D"));
fs.mkdirs(new Path("/root/C/G"));
fs.mkdirs(new Path("/root/C/I"));
fs.satisfyStoragePolicy(new Path("/root"));
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved("/root",
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test SPS for directory which has multilevel directories.
*/
@Test(timeout = 300000)
public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
throws Exception {
try {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.DISK}};
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
List<String> files = getDFSListOfTree();
dfs.setStoragePolicy(new Path("/root"), COLD);
dfs.satisfyStoragePolicy(new Path("/root"));
for (String fileName : files) {
// Wait till the block is moved to ARCHIVE
DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
30000, dfs);
}
} finally {
shutdownCluster();
}
}
/**
* Test SPS for batch processing.
*/
@Test(timeout = 300000)
public void testBatchProcessingForSPSDirectory() throws Exception {
try {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.DISK}};
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
// Set queue max capacity
config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
5);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
List<String> files = getDFSListOfTree();
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
.getLog(FSTreeTraverser.class));
dfs.setStoragePolicy(new Path("/root"), COLD);
dfs.satisfyStoragePolicy(new Path("/root"));
for (String fileName : files) {
// Wait till the block is moved to ARCHIVE
DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
30000, dfs);
}
waitForBlocksMovementResult(files.size(), 30000);
String expectedLogMessage = "StorageMovementNeeded queue remaining"
+ " capacity is zero";
assertTrue("Log output does not contain expected log message: "
+ expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
} finally {
shutdownCluster();
}
}
/**
* Test traverse when parent got deleted.
* 1. Delete /root when traversing Q
* 2. U, R, S should not be in queued.
*/
@Test
public void testTraverseWhenParentDeleted() throws Exception {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.DISK}};
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
List<String> expectedTraverseOrder = getDFSListOfTree();
//Remove files which will not be traverse when parent is deleted
expectedTraverseOrder.remove("/root/D/L/R");
expectedTraverseOrder.remove("/root/D/L/S");
expectedTraverseOrder.remove("/root/D/L/Q/U");
FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
//Queue limit can control the traverse logic to wait for some free
//entry in queue. After 10 files, traverse control will be on U.
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
Mockito.when(sps.isRunning()).thenReturn(true);
BlockStorageMovementNeeded movmentNeededQueue =
new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
movmentNeededQueue.init();
//Wait for thread to reach U.
Thread.sleep(1000);
dfs.delete(new Path("/root/D/L"), true);
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
long trackId = movmentNeededQueue.get().getTrackId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
//Wait to finish tree traverse
Thread.sleep(5000);
// Check other element traversed in order and R,S should not be added in
// queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
long trackId = movmentNeededQueue.get().getTrackId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
dfs.delete(new Path("/root"), true);
}
/**
* Test traverse when root parent got deleted.
* 1. Delete L when traversing Q
* 2. E, M, U, R, S should not be in queued.
*/
@Test
public void testTraverseWhenRootParentDeleted() throws Exception {
StorageType[][] diskTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.DISK}};
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
List<String> expectedTraverseOrder = getDFSListOfTree();
// Remove files which will not be traverse when parent is deleted
expectedTraverseOrder.remove("/root/D/L/R");
expectedTraverseOrder.remove("/root/D/L/S");
expectedTraverseOrder.remove("/root/D/L/Q/U");
expectedTraverseOrder.remove("/root/D/M");
expectedTraverseOrder.remove("/root/E");
FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
Mockito.when(sps.isRunning()).thenReturn(true);
// Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U.
BlockStorageMovementNeeded movmentNeededQueue =
new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
movmentNeededQueue.init();
INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
// Wait for thread to reach U.
Thread.sleep(1000);
dfs.delete(new Path("/root/D/L"), true);
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
long trackId = movmentNeededQueue.get().getTrackId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
// Wait to finish tree traverse
Thread.sleep(5000);
// Check other element traversed in order and E, M, U, R, S should not be
// added in queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
long trackId = movmentNeededQueue.get().getTrackId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
dfs.delete(new Path("/root"), true);
}
private static void createDirectoryTree(DistributedFileSystem dfs)
throws Exception {
// tree structure
/*
* root
* |
* A--------B--------C--------D--------E
* | |
* F----G----H----I J----K----L----M
* | |
* N----O----P Q----R----S
* | |
* T U
*/
// create root Node and child
dfs.mkdirs(new Path("/root"));
DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0);
dfs.mkdirs(new Path("/root/B"));
DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0);
dfs.mkdirs(new Path("/root/D"));
DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0);
// Create /root/B child
DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0);
dfs.mkdirs(new Path("/root/B/G"));
DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0);
DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0);
// Create /root/D child
DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0);
DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0);
dfs.mkdirs(new Path("/root/D/L"));
DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0);
// Create /root/B/G child
DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0);
DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0);
dfs.mkdirs(new Path("/root/B/G/P"));
// Create /root/D/L child
dfs.mkdirs(new Path("/root/D/L/Q"));
DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0);
DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0);
// Create /root/B/G/P child
DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0);
// Create /root/D/L/Q child
DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0);
}
private List<String> getDFSListOfTree() {
List<String> dfsList = new ArrayList<>();
dfsList.add("/root/A");
dfsList.add("/root/B/F");
dfsList.add("/root/B/G/N");
dfsList.add("/root/B/G/O");
dfsList.add("/root/B/G/P/T");
dfsList.add("/root/B/H");
dfsList.add("/root/B/I");
dfsList.add("/root/C");
dfsList.add("/root/D/J");
dfsList.add("/root/D/K");
dfsList.add("/root/D/L/Q/U");
dfsList.add("/root/D/L/R");
dfsList.add("/root/D/L/S");
dfsList.add("/root/D/M");
dfsList.add("/root/E");
return dfsList;
}
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
throws IOException {
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();