HDFS-11572. [SPS]: SPS should clean Xattrs when no blocks required to satisfy for a file. Contributed by Uma Maheswara Rao G

This commit is contained in:
Rakesh Radhakrishnan 2017-04-20 23:14:36 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 695a402fca
commit 6fe6c549e8
5 changed files with 225 additions and 56 deletions

View File

@ -333,7 +333,7 @@ void blockStorageMovementResultCheck() throws IOException {
+ "doesn't exists in storageMovementAttemptedItems list",
storageMovementAttemptedResult.getTrackId());
// Remove xattr for the track id.
this.sps.notifyBlkStorageMovementFinished(
this.sps.postBlkStorageMovementCleanup(
storageMovementAttemptedResult.getTrackId());
}
break;

View File

@ -79,6 +79,27 @@ public class StoragePolicySatisfier implements Runnable {
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
/**
* Represents the collective analysis status for all blocks.
*/
private enum BlocksMovingAnalysisStatus {
// Represents that, the analysis skipped due to some conditions. A such
// condition is if block collection is in incomplete state.
ANALYSIS_SKIPPED_FOR_RETRY,
// Represents that, all block storage movement needed blocks found its
// targets.
ALL_BLOCKS_TARGETS_PAIRED,
// Represents that, only fewer or none of the block storage movement needed
// block found its eligible targets.
FEW_BLOCKS_TARGETS_PAIRED,
// Represents that, none of the blocks found for block storage movements.
BLOCKS_ALREADY_SATISFIED,
// Represents that, the analysis skipped due to some conditions.
// Example conditions are if no blocks really exists in block collection or
// if analysis is not required on ec files with unsuitable storage policies
BLOCKS_TARGET_PAIRING_SKIPPED;
}
public StoragePolicySatisfier(final Namesystem namesystem,
final BlockStorageMovementNeeded storageMovementNeeded,
final BlockManager blkManager, Configuration conf) {
@ -208,10 +229,31 @@ public void run() {
namesystem.getBlockCollection(blockCollectionID);
// Check blockCollectionId existence.
if (blockCollection != null) {
boolean allBlockLocsAttemptedToSatisfy =
computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
this.storageMovementsMonitor
.add(blockCollectionID, allBlockLocsAttemptedToSatisfy);
BlocksMovingAnalysisStatus status =
analyseBlocksStorageMovementsAndAssignToDN(blockCollection);
switch (status) {
// Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY:
// Just add to monitor, so it will be tracked for result and
// be removed on successful storage movement result.
case ALL_BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(blockCollectionID, true);
break;
// Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
// that it will be tracked and still it will be consider for retry
// as analysis was not found targets for storage movement blocks.
case FEW_BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(blockCollectionID, false);
break;
// Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED:
case BLOCKS_ALREADY_SATISFIED:
default:
LOG.info("Block analysis skipped or blocks already satisfied"
+ " with storages. So, Cleaning up the Xattrs.");
postBlkStorageMovementCleanup(blockCollectionID);
break;
}
}
}
}
@ -235,15 +277,15 @@ public void run() {
}
LOG.error("StoragePolicySatisfier thread received runtime exception. "
+ "Stopping Storage policy satisfier work", t);
// TODO: Just break for now. Once we implement dynamic start/stop
// option, we can add conditions here when to break/terminate.
break;
}
}
}
private boolean computeAndAssignStorageMismatchedBlocksToDNs(
private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
BlockCollection blockCollection) {
BlocksMovingAnalysisStatus status =
BlocksMovingAnalysisStatus.BLOCKS_ALREADY_SATISFIED;
byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
BlockStoragePolicy existingStoragePolicy =
blockManager.getStoragePolicy(existingStoragePolicyID);
@ -252,21 +294,20 @@ private boolean computeAndAssignStorageMismatchedBlocksToDNs(
// So, should we add back? or leave it to user
LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+ " this to the next retry iteration", blockCollection.getId());
return true;
return BlocksMovingAnalysisStatus.ANALYSIS_SKIPPED_FOR_RETRY;
}
// First datanode will be chosen as the co-ordinator node for storage
// movements. Later this can be optimized if needed.
DatanodeDescriptor coordinatorNode = null;
BlockInfo[] blocks = blockCollection.getBlocks();
if (blocks.length == 0) {
LOG.info("BlockCollectionID: {} file is not having any blocks."
+ " So, skipping the analysis.", blockCollection.getId());
return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
}
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
// True value represents that, SPS is able to find matching target nodes
// to satisfy storage type for all the blocks locations of the given
// blockCollection. A false value represents that, blockCollection needed
// retries to satisfy the storage policy for some of the block locations.
boolean foundMatchingTargetNodesForAllBlocks = true;
for (int i = 0; i < blocks.length; i++) {
BlockInfo blockInfo = blocks[i];
List<StorageType> expectedStorageTypes;
@ -283,19 +324,38 @@ private boolean computeAndAssignStorageMismatchedBlocksToDNs(
LOG.warn("The storage policy " + existingStoragePolicy.getName()
+ " is not suitable for Striped EC files. "
+ "So, ignoring to move the blocks");
return false;
return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
}
} else {
expectedStorageTypes = existingStoragePolicy
.chooseStorageTypes(blockInfo.getReplication());
}
foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos(
blockMovingInfos, blockInfo, expectedStorageTypes);
DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
StorageType[] storageTypes = new StorageType[storages.length];
for (int j = 0; j < storages.length; j++) {
DatanodeStorageInfo datanodeStorageInfo = storages[j];
StorageType storageType = datanodeStorageInfo.getStorageType();
storageTypes[j] = storageType;
}
List<StorageType> existing =
new LinkedList<StorageType>(Arrays.asList(storageTypes));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
blockInfo, expectedStorageTypes, existing, storages);
if (computeStatus
&& status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED) {
status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
} else {
status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
}
}
}
assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
blockMovingInfos, coordinatorNode);
return foundMatchingTargetNodesForAllBlocks;
return status;
}
/**
@ -311,22 +371,18 @@ private boolean computeAndAssignStorageMismatchedBlocksToDNs(
* - block details
* @param expectedStorageTypes
* - list of expected storage type to satisfy the storage policy
* @param existing
* - list to get existing storage types
* @param storages
* - available storages
* @return false if some of the block locations failed to find target node to
* satisfy the storage policy, true otherwise
*/
private boolean computeBlockMovingInfos(
List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
List<StorageType> expectedStorageTypes) {
List<StorageType> expectedStorageTypes, List<StorageType> existing,
DatanodeStorageInfo[] storages) {
boolean foundMatchingTargetNodesForBlock = true;
DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
StorageType[] storageTypes = new StorageType[storages.length];
for (int j = 0; j < storages.length; j++) {
DatanodeStorageInfo datanodeStorageInfo = storages[j];
StorageType storageType = datanodeStorageInfo.getStorageType();
storageTypes[j] = storageType;
}
List<StorageType> existing =
new LinkedList<StorageType>(Arrays.asList(storageTypes));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
@ -756,7 +812,7 @@ private void clearQueuesWithNotification() {
Long id;
while ((id = storageMovementNeeded.get()) != null) {
try {
notifyBlkStorageMovementFinished(id);
postBlkStorageMovementCleanup(id);
} catch (IOException ie) {
LOG.warn("Failed to remove SPS "
+ "xattr for collection id " + id, ie);
@ -771,7 +827,7 @@ private void clearQueuesWithNotification() {
* @param trackId track id i.e., block collection id.
* @throws IOException
*/
public void notifyBlkStorageMovementFinished(long trackId)
public void postBlkStorageMovementCleanup(long trackId)
throws IOException {
this.namesystem.getFSDirectory().removeSPSXattr(trackId);
}

View File

@ -97,8 +97,11 @@
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
@ -155,9 +158,12 @@
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.XAttrStorage;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -175,6 +181,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
@ -2456,4 +2463,32 @@ public Boolean get() {
}
}, 500, timeout);
}
/**
* Waits for removal of a specified Xattr on a specified file.
*
* @param srcPath
* file name.
* @param xattr
* name of the extended attribute.
* @param ns
* Namesystem
* @param timeout
* max wait time
* @throws Exception
*/
public static void waitForXattrRemoved(String srcPath, String xattr,
Namesystem ns, int timeout) throws TimeoutException, InterruptedException,
UnresolvedLinkException, AccessControlException,
ParentNotDirectoryException {
final INode inode = ns.getFSDirectory().getINode(srcPath);
final XAttr satisfyXAttr = XAttrHelper.buildXAttr(xattr);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
return !existingXAttrs.contains(satisfyXAttr);
}
}, 100, timeout);
}
}

View File

@ -20,22 +20,18 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertFalse;
/**
* Test persistence of satisfying files/directories.
@ -341,15 +337,9 @@ public void testMultipleSatisfyStoragePolicy() throws Exception {
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 2, timeout, fs);
// Make sure that SPS xattr has been removed.
int retryTime = 0;
while (retryTime < 30) {
if (!fileContainsSPSXAttr(testFile)) {
break;
}
Thread.sleep(minCheckTimeout);
retryTime += 1;
}
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved(testFileName,
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
fs.setStoragePolicy(testFile, COLD);
fs.satisfyStoragePolicy(testFile);
@ -379,7 +369,8 @@ public void testDropSPS() throws Exception {
cluster.getNamesystem().getBlockManager().deactivateSPS();
// Make sure satisfy xattr has been removed.
assertFalse(fileContainsSPSXAttr(testFile));
DFSTestUtil.waitForXattrRemoved(testFileName,
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
} finally {
clusterShutdown();
@ -387,18 +378,29 @@ public void testDropSPS() throws Exception {
}
/**
* Check whether file contains SPS xattr.
* @param fileName file name.
* @return true if file contains SPS xattr.
* @throws IOException
* Tests that Xattrs should be cleaned if all blocks already satisfied.
*
* @throws Exception
*/
private boolean fileContainsSPSXAttr(Path fileName) throws IOException {
final INode inode = cluster.getNamesystem()
.getFSDirectory().getINode(fileName.toString());
final XAttr satisfyXAttr =
XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
return existingXAttrs.contains(satisfyXAttr);
@Test(timeout = 300000)
public void testSPSShouldNotLeakXattrIfStorageAlreadySatisfied()
throws Exception {
try {
clusterSetUp();
DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3,
timeout, fs);
fs.satisfyStoragePolicy(testFile);
DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3,
timeout, fs);
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved(testFileName,
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
} finally {
clusterShutdown();
}
}
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertNull;
import java.io.FileNotFoundException;
@ -34,13 +35,17 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -828,6 +833,77 @@ public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
}
}
/**
* Tests that Xattrs should be cleaned if satisfy storage policy called on EC
* file with unsuitable storage policy set.
*
* @throws Exception
*/
@Test(timeout = 300000)
public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles()
throws Exception {
StorageType[][] diskTypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.DISK, StorageType.SSD},
{StorageType.DISK, StorageType.SSD},
{StorageType.DISK, StorageType.SSD},
{StorageType.DISK, StorageType.SSD},
{StorageType.DISK, StorageType.SSD}};
int defaultStripedBlockSize =
ErasureCodingPolicyManager.getSystemPolicies()[0].getCellSize() * 4;
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize);
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1L);
config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false);
try {
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
storagesPerDatanode, capacity);
// set "/foo" directory with ONE_SSD storage policy.
ClientProtocol client = NameNodeProxies.createProxy(config,
hdfsCluster.getFileSystem(0).getUri(), ClientProtocol.class)
.getProxy();
String fooDir = "/foo";
client.mkdirs(fooDir, new FsPermission((short) 777), true);
// set an EC policy on "/foo" directory
client.setErasureCodingPolicy(fooDir, null);
// write file to fooDir
final String testFile = "/foo/bar";
long fileLen = 20 * defaultStripedBlockSize;
dfs = hdfsCluster.getFileSystem();
DFSTestUtil.createFile(dfs, new Path(testFile), fileLen, (short) 3, 0);
// ONESSD is unsuitable storage policy on EC files
client.setStoragePolicy(fooDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
dfs.satisfyStoragePolicy(new Path(testFile));
// Thread.sleep(9000); // To make sure SPS triggered
// verify storage types and locations
LocatedBlocks locatedBlocks =
client.getBlockLocations(testFile, 0, fileLen);
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (StorageType type : lb.getStorageTypes()) {
Assert.assertEquals(StorageType.DISK, type);
}
}
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved(testFile, XATTR_SATISFY_STORAGE_POLICY,
hdfsCluster.getNamesystem(), 30000);
} finally {
shutdownCluster();
}
}
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
throws IOException {
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();