HDFS-11150: [SPS]: Provide persistence when satisfying storage policy. Contributed by Yuanbo Liu

This commit is contained in:
Uma Maheswara Rao G 2017-01-11 13:48:58 -08:00 committed by Uma Maheswara Rao Gangumalla
parent df2b551e79
commit 6215e35bb6
12 changed files with 532 additions and 94 deletions

View File

@ -1754,7 +1754,7 @@ public interface ClientProtocol {
* @throws org.apache.hadoop.hdfs.server.namenode.SafeModeException append not
* allowed in safemode.
*/
@Idempotent
@AtMostOnce
void satisfyStoragePolicy(String path) throws IOException;
/**

View File

@ -365,6 +365,9 @@ public interface HdfsServerConstants {
String XATTR_ERASURECODING_POLICY =
"system.hdfs.erasurecoding.policy";
String XATTR_SATISFY_STORAGE_POLICY =
"system.hdfs.satisfy.storage.policy";
Path MOVER_ID_PATH = new Path("/system/mover.id");
long BLOCK_GROUP_INDEX_MASK = 15;

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@ -42,12 +43,14 @@ import com.google.common.collect.Lists;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
public class FSDirAttrOp {
static FileStatus setPermission(
@ -190,10 +193,11 @@ public class FSDirAttrOp {
return fsd.getAuditFileInfo(iip);
}
static void satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
String src) throws IOException {
static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
String src, boolean logRetryCache) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
INodesInPath iip;
fsd.writeLock();
try {
@ -203,10 +207,13 @@ public class FSDirAttrOp {
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
unprotectedSatisfyStoragePolicy(bm, iip);
XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd);
xAttrs.add(satisfyXAttr);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
return fsd.getAuditFileInfo(iip);
}
static BlockStoragePolicy[] getStoragePolicies(BlockManager bm)
@ -470,33 +477,61 @@ public class FSDirAttrOp {
}
}
static void unprotectedSatisfyStoragePolicy(BlockManager bm,
INodesInPath iip) throws IOException {
static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip,
BlockManager bm, FSDirectory fsd) throws IOException {
// check whether file exists.
INode inode = iip.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File/Directory does not exist: "
+ iip.getPath());
}
final INode inode = FSDirectory.resolveLastINode(iip);
final int snapshotId = iip.getLatestSnapshotId();
final List<INode> candidateNodes = new ArrayList<>();
// TODO: need to check whether inode's storage policy
// has been satisfied or inode exists in the satisfier
// list before calling satisfyStoragePolicy in BlockManager.
if (inode.isDirectory()) {
final int snapshotId = iip.getLatestSnapshotId();
// TODO: think about optimization here, label the dir instead
// of the sub-files of the dir.
if (inode.isFile()) {
candidateNodes.add(inode);
} else if (inode.isDirectory()) {
for (INode node : inode.asDirectory().getChildrenList(snapshotId)) {
if (node.isFile()) {
bm.satisfyStoragePolicy(node.getId());
candidateNodes.add(node);
}
}
} else if (inode.isFile()) {
bm.satisfyStoragePolicy(inode.getId());
} else {
throw new FileNotFoundException("File/Directory does not exist: "
+ iip.getPath());
}
// If node has satisfy xattr, then stop adding it
// to satisfy movement queue.
if (inodeHasSatisfyXAttr(candidateNodes)) {
throw new IOException(
"Cannot request to call satisfy storage policy on path "
+ iip.getPath()
+ ", as this file/dir was already called for satisfying "
+ "storage policy.");
}
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
final XAttr satisfyXAttr =
XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
xattrs.add(satisfyXAttr);
for (INode node : candidateNodes) {
bm.satisfyStoragePolicy(node.getId());
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(node);
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(
fsd, existingXAttrs, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId);
}
return satisfyXAttr;
}
private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) {
// If the node is a directory and one of the child files
// has satisfy xattr, then return true for this directory.
for (INode inode : candidateNodes) {
final XAttrFeature f = inode.getXAttrFeature();
if (inode.isFile() &&
f != null && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
return true;
}
}
return false;
}
private static void setDirStoragePolicy(

View File

@ -42,6 +42,7 @@ import java.util.ListIterator;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
class FSDirXAttrOp {
private static final XAttr KEYID_XATTR =
@ -294,6 +295,13 @@ class FSDirXAttrOp {
}
}
// Add inode id to movement queue if xattrs contain satisfy xattr.
if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) {
FSDirAttrOp.unprotectedSatisfyStoragePolicy(iip,
fsd.getBlockManager(), fsd);
continue;
}
if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) {
throw new IOException("Can only set '" +
SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' on a file.");

View File

@ -91,6 +91,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
/**
@ -1400,10 +1401,23 @@ public class FSDirectory implements Closeable {
if (!inode.isSymlink()) {
final XAttrFeature xaf = inode.getXAttrFeature();
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
}
}
}
private void addStoragePolicySatisfier(INodeWithAdditionalFields inode,
XAttrFeature xaf) {
if (xaf == null || inode.isDirectory()) {
return;
}
XAttr xattr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);
if (xattr == null) {
return;
}
getBlockManager().satisfyStoragePolicy(inode.getId());
}
private void addEncryptionZone(INodeWithAdditionalFields inode,
XAttrFeature xaf) {
if (xaf == null) {

View File

@ -2233,7 +2233,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*
* @param src file/directory path
*/
void satisfyStoragePolicy(String src) throws IOException {
void satisfyStoragePolicy(String src, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
writeLock();
try {
@ -2255,8 +2256,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ " by admin. Seek for an admin help to activate it "
+ "or use Mover tool.");
}
// TODO: need to update editlog for persistence.
FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src);
FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src, logRetryCache);
} finally {
writeUnlock();
}

View File

@ -1408,7 +1408,18 @@ public class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void satisfyStoragePolicy(String src) throws IOException {
checkNNStartup();
namesystem.satisfyStoragePolicy(src);
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.satisfyStoragePolicy(src, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol

View File

@ -168,16 +168,18 @@ public class StoragePolicySatisfier implements Runnable {
}
while (namesystem.isRunning() && isRunning) {
try {
Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) {
BlockCollection blockCollection =
namesystem.getBlockCollection(blockCollectionID);
// Check blockCollectionId existence.
if (blockCollection != null) {
boolean allBlockLocsAttemptedToSatisfy =
computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
this.storageMovementsMonitor.add(blockCollectionID,
allBlockLocsAttemptedToSatisfy);
if (!namesystem.isInSafeMode()) {
Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) {
BlockCollection blockCollection =
namesystem.getBlockCollection(blockCollectionID);
// Check blockCollectionId existence.
if (blockCollection != null) {
boolean allBlockLocsAttemptedToSatisfy =
computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
this.storageMovementsMonitor
.add(blockCollectionID, allBlockLocsAttemptedToSatisfy);
}
}
}
// TODO: We can think to make this as configurable later, how frequently

View File

@ -2419,4 +2419,41 @@ public class DFSTestUtil {
}
}
}
/**
* Check whether the Block movement has been successfully
* completed to satisfy the storage policy for the given file.
* @param fileName file name.
* @param expectedStorageType storage type.
* @param expectedStorageCount expected storage type.
* @param timeout timeout.
* @param fs distributedFileSystem.
* @throws Exception
*/
public static void waitExpectedStorageType(String fileName,
final StorageType expectedStorageType, int expectedStorageCount,
int timeout, DistributedFileSystem fs) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final LocatedBlock lb;
try {
lb = fs.getClient().getLocatedBlocks(fileName, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
}
int actualStorageCount = 0;
for(StorageType type : lb.getStorageTypes()) {
if (expectedStorageType == type) {
actualStorageCount++;
}
}
LOG.info(
expectedStorageType + " replica count, expected="
+ expectedStorageCount + " and actual=" + actualStorageCount);
return expectedStorageCount == actualStorageCount;
}
}, 1000, timeout);
}
}

View File

@ -0,0 +1,311 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.junit.Test;
import java.io.IOException;
/**
* Test persistence of satisfying files/directories.
*/
public class TestPersistentStoragePolicySatisfier {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
private static Path testFile =
new Path("/testFile");
private static String testFileName = testFile.toString();
private static Path parentDir = new Path("/parentDir");
private static Path parentFile = new Path(parentDir, "parentFile");
private static String parentFileName = parentFile.toString();
private static Path childDir = new Path(parentDir, "childDir");
private static Path childFile = new Path(childDir, "childFile");
private static String childFileName = childFile.toString();
private static final String COLD = "COLD";
private static final String WARM = "WARM";
private static final String ONE_SSD = "ONE_SSD";
private static final String ALL_SSD = "ALL_SSD";
private static StorageType[][] storageTypes = new StorageType[][] {
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.DISK, StorageType.SSD},
{StorageType.SSD, StorageType.RAM_DISK},
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.SSD}
};
private final int timeout = 300000;
/**
* Setup environment for every test case.
* @throws IOException
*/
public void clusterSetUp() throws Exception {
clusterSetUp(false);
}
/**
* Setup cluster environment.
* @param isHAEnabled if true, enable simple HA.
* @throws IOException
*/
private void clusterSetUp(boolean isHAEnabled) throws Exception {
conf = new HdfsConfiguration();
final int dnNumber = storageTypes.length;
final short replication = 3;
MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
.storageTypes(storageTypes)
.numDataNodes(dnNumber);
if (isHAEnabled) {
clusterBuilder.nnTopology(MiniDFSNNTopology.simpleHATopology());
}
cluster = clusterBuilder.build();
cluster.waitActive();
if (isHAEnabled) {
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
} else {
fs = cluster.getFileSystem();
}
createTestFiles(fs, replication);
}
/**
* Setup test files for testing.
* @param dfs
* @param replication
* @throws Exception
*/
private void createTestFiles(DistributedFileSystem dfs,
short replication) throws Exception {
DFSTestUtil.createFile(dfs, testFile, 1024L, replication, 0L);
DFSTestUtil.createFile(dfs, parentFile, 1024L, replication, 0L);
DFSTestUtil.createFile(dfs, childFile, 1024L, replication, 0L);
DFSTestUtil.waitReplication(dfs, testFile, replication);
DFSTestUtil.waitReplication(dfs, parentFile, replication);
DFSTestUtil.waitReplication(dfs, childFile, replication);
}
/**
* Tear down environment for every test case.
* @throws IOException
*/
private void clusterShutdown() throws IOException{
if(fs != null) {
fs.close();
fs = null;
}
if(cluster != null) {
cluster.shutdown(true);
cluster = null;
}
}
/**
* While satisfying file/directory, trigger the cluster's checkpoint to
* make sure satisfier persistence work as expected. This test case runs
* as below:
* 1. use satisfyStoragePolicy and add xAttr to the file.
* 2. do the checkpoint by secondary NameNode.
* 3. restart the cluster immediately.
* 4. make sure all the storage policies are satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithCheckpoint() throws Exception {
try {
clusterSetUp();
fs.setStoragePolicy(testFile, WARM);
fs.satisfyStoragePolicy(testFile);
// Start the checkpoint.
conf.set(
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
SecondaryNameNode secondary = new SecondaryNameNode(conf);
secondary.doCheckpoint();
restartCluster();
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.ARCHIVE, 2, timeout, fs);
fs.setStoragePolicy(parentDir, COLD);
fs.satisfyStoragePolicy(parentDir);
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.DEFAULT, 3, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Tests to verify satisfier persistence working as expected
* in HA env. This test case runs as below:
* 1. setup HA cluster env with simple HA topology.
* 2. switch the active NameNode from nn0/nn1 to nn1/nn0.
* 3. make sure all the storage policies are satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithHA() throws Exception {
try {
// Enable HA env for testing.
clusterSetUp(true);
fs.setStoragePolicy(testFile, ALL_SSD);
fs.satisfyStoragePolicy(testFile);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.SSD, 3, timeout, fs);
// test directory
fs.setStoragePolicy(parentDir, WARM);
fs.satisfyStoragePolicy(parentDir);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.DEFAULT, 3, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Tests to verify satisfier persistence working well with multiple
* restarts operations. This test case runs as below:
* 1. satisfy the storage policy of file1.
* 2. restart the cluster.
* 3. check whether all the blocks are satisfied.
* 4. satisfy the storage policy of file2.
* 5. restart the cluster.
* 6. check whether all the blocks are satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithRestarts() throws Exception {
try {
clusterSetUp();
fs.setStoragePolicy(testFile, ONE_SSD);
fs.satisfyStoragePolicy(testFile);
restartCluster();
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.SSD, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 2, timeout, fs);
// test directory
fs.setStoragePolicy(parentDir, COLD);
fs.satisfyStoragePolicy(parentDir);
restartCluster();
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.DEFAULT, 3, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Tests to verify satisfier persistence working well with
* federal HA env. This test case runs as below:
* 1. setup HA test environment with federal topology.
* 2. satisfy storage policy of file1.
* 3. switch active NameNode from nn0 to nn1.
* 4. switch active NameNode from nn2 to nn3.
* 5. check whether the storage policy of file1 is satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithFederationHA() throws Exception {
try {
conf = new HdfsConfiguration();
final MiniDFSCluster haCluster = new MiniDFSCluster
.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
.storageTypes(storageTypes)
.numDataNodes(storageTypes.length).build();
haCluster.waitActive();
haCluster.transitionToActive(1);
haCluster.transitionToActive(3);
fs = HATestUtil.configureFailoverFs(haCluster, conf);
createTestFiles(fs, (short) 3);
fs.setStoragePolicy(testFile, WARM);
fs.satisfyStoragePolicy(testFile);
haCluster.transitionToStandby(1);
haCluster.transitionToActive(0);
haCluster.transitionToStandby(3);
haCluster.transitionToActive(2);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.ARCHIVE, 2, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Restart the hole env and trigger the DataNode's heart beats.
* @throws Exception
*/
private void restartCluster() throws Exception {
cluster.restartDataNodes();
cluster.restartNameNodes();
cluster.waitActive();
cluster.triggerHeartbeats();
}
}

View File

@ -108,7 +108,8 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 3, 30000, dfs);
} finally {
shutdownCluster();
}
@ -137,7 +138,8 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
waitExpectedStorageType(file, StorageType.SSD, 3, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 3, 30000, dfs);
} finally {
shutdownCluster();
}
@ -164,8 +166,10 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
} finally {
shutdownCluster();
}
@ -195,8 +199,10 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till the block is moved to SSD areas
waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
waitForBlocksMovementResult(1, 30000);
} finally {
@ -245,8 +251,10 @@ public class TestStoragePolicySatisfier {
for (String fileName : files) {
// Wait till the block is moved to SSD areas
waitExpectedStorageType(fileName, StorageType.SSD, 1, 30000);
waitExpectedStorageType(fileName, StorageType.DISK, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
fileName, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
fileName, StorageType.DISK, 2, 30000, dfs);
}
waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
@ -279,7 +287,8 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 3, 30000, dfs);
} finally {
shutdownCluster();
}
@ -317,11 +326,14 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// take effect for the file in the directory.
waitExpectedStorageType(subFile1, StorageType.SSD, 1, 30000);
waitExpectedStorageType(subFile1, StorageType.DISK, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
subFile1, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
subFile1, StorageType.DISK, 2, 30000, dfs);
// take no effect for the sub-dir's file in the directory.
waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000);
DFSTestUtil.waitExpectedStorageType(
subFile2, StorageType.DEFAULT, 3, 30000, dfs);
} finally {
shutdownCluster();
}
@ -367,6 +379,20 @@ public class TestStoragePolicySatisfier {
} catch (FileNotFoundException e) {
}
try {
hdfsAdmin.satisfyStoragePolicy(new Path(file));
hdfsAdmin.satisfyStoragePolicy(new Path(file));
Assert.fail(String.format(
"Should failed to satisfy storage policy "
+ "for %s ,since it has been "
+ "added to satisfy movement queue.", file));
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
String.format("Cannot request to call satisfy storage policy "
+ "on path %s, as this file/dir was already called for "
+ "satisfying storage policy.", file), e);
}
} finally {
shutdownCluster();
}
@ -407,8 +433,10 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
waitForBlocksMovementResult(1, 30000);
} finally {
@ -451,7 +479,8 @@ public class TestStoragePolicySatisfier {
// No block movement will be scheduled as there is no target node available
// with the required storage type.
waitForAttemptedItems(1, 30000);
waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 3, 30000, dfs);
// Since there is no target node the item will get timed out and then
// re-attempted.
waitForAttemptedItems(1, 30000);
@ -523,8 +552,10 @@ public class TestStoragePolicySatisfier {
// with the required storage type.
waitForAttemptedItems(1, 30000);
waitForBlocksMovementResult(1, 30000);
waitExpectedStorageType(file1, StorageType.ARCHIVE, 1, 30000);
waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
file1, StorageType.ARCHIVE, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file1, StorageType.DISK, 2, 30000, dfs);
}
/**
@ -571,8 +602,10 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 2, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 3, 30000, dfs);
waitForBlocksMovementResult(1, 30000);
} finally {
@ -606,8 +639,10 @@ public class TestStoragePolicySatisfier {
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 2, 30000, dfs);
} finally {
shutdownCluster();
@ -644,8 +679,10 @@ public class TestStoragePolicySatisfier {
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
waitExpectedStorageType(file, StorageType.DISK, 1, 30000);
waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.DISK, 1, 30000, dfs);
DFSTestUtil.waitExpectedStorageType(
file, StorageType.ARCHIVE, 2, 30000, dfs);
} finally {
shutdownCluster();
}
@ -771,33 +808,4 @@ public class TestStoragePolicySatisfier {
cluster.waitActive();
return cluster;
}
// Check whether the Block movement has been successfully completed to satisfy
// the storage policy for the given file.
private void waitExpectedStorageType(final String fileName,
final StorageType expectedStorageType, int expectedStorageCount,
int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlock lb = null;
try {
lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
}
int actualStorageCount = 0;
for (StorageType storageType : lb.getStorageTypes()) {
if (expectedStorageType == storageType) {
actualStorageCount++;
}
}
LOG.info(
expectedStorageType + " replica count, expected={} and actual={}",
expectedStorageType, actualStorageCount);
return expectedStorageCount == actualStorageCount;
}
}, 100, timeout);
}
}

View File

@ -66,7 +66,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
private int defaultStripeBlockSize;
private ErasureCodingPolicy getEcPolicy() {
return ErasureCodingPolicyManager.getSystemDefaultPolicy();
return StripedFileTestUtil.getDefaultECPolicy();
}
/**
@ -99,6 +99,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
}
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
StripedFileTestUtil.getDefaultECPolicy().getName());
initConfWithStripe(conf, defaultStripeBlockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
@ -128,7 +130,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
client.mkdirs(barDir, new FsPermission((short) 777), true);
client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
// set an EC policy on "/bar" directory
client.setErasureCodingPolicy(barDir, null);
client.setErasureCodingPolicy(barDir,
StripedFileTestUtil.getDefaultECPolicy().getName());
// write file to barDir
final String fooFile = "/bar/foo";
@ -206,6 +209,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
}
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
StripedFileTestUtil.getDefaultECPolicy().getName());
initConfWithStripe(conf, defaultStripeBlockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
@ -235,7 +240,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
client.mkdirs(barDir, new FsPermission((short) 777), true);
client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
// set an EC policy on "/bar" directory
client.setErasureCodingPolicy(barDir, null);
client.setErasureCodingPolicy(barDir,
StripedFileTestUtil.getDefaultECPolicy().getName());
// write file to barDir
final String fooFile = "/bar/foo";
@ -314,6 +320,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
}
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
StripedFileTestUtil.getDefaultECPolicy().getName());
initConfWithStripe(conf, defaultStripeBlockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
@ -343,7 +351,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
client.mkdirs(barDir, new FsPermission((short) 777), true);
client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
// set an EC policy on "/bar" directory
client.setErasureCodingPolicy(barDir, null);
client.setErasureCodingPolicy(barDir,
StripedFileTestUtil.getDefaultECPolicy().getName());
// write file to barDir
final String fooFile = "/bar/foo";