diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 4b95c3dfe3d..81d7c91f8d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1754,7 +1754,7 @@ BatchedEntries listOpenFiles(long prevId, * @throws org.apache.hadoop.hdfs.server.namenode.SafeModeException append not * allowed in safemode. */ - @Idempotent + @AtMostOnce void satisfyStoragePolicy(String path) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index e486317017e..42a2fc6ff10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -365,6 +365,9 @@ enum BlockUCState { String XATTR_ERASURECODING_POLICY = "system.hdfs.erasurecoding.policy"; + String XATTR_SATISFY_STORAGE_POLICY = + "system.hdfs.satisfy.storage.policy"; + Path MOVER_ID_PATH = new Path("/system/mover.id"); long BLOCK_GROUP_INDEX_MASK = 15; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index fbf93283fbb..500314b6b4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @@ -42,12 +43,14 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; public class FSDirAttrOp { static FileStatus setPermission( @@ -190,10 +193,11 @@ static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc, return fsd.getAuditFileInfo(iip); } - static void satisfyStoragePolicy(FSDirectory fsd, BlockManager bm, - String src) throws IOException { + static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm, + String src, boolean logRetryCache) throws IOException { FSPermissionChecker pc = fsd.getPermissionChecker(); + List xAttrs = Lists.newArrayListWithCapacity(1); INodesInPath iip; fsd.writeLock(); try { @@ -203,10 +207,13 @@ static void satisfyStoragePolicy(FSDirectory fsd, BlockManager bm, if (fsd.isPermissionEnabled()) { fsd.checkPathAccess(pc, iip, FsAction.WRITE); } - unprotectedSatisfyStoragePolicy(bm, iip); + XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd); + xAttrs.add(satisfyXAttr); } finally { fsd.writeUnlock(); } + fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); + return fsd.getAuditFileInfo(iip); } static BlockStoragePolicy[] getStoragePolicies(BlockManager bm) @@ -470,33 +477,61 @@ static void unprotectedSetStoragePolicy(FSDirectory fsd, BlockManager bm, } } - static void unprotectedSatisfyStoragePolicy(BlockManager bm, - INodesInPath iip) throws IOException { + static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip, + BlockManager bm, FSDirectory fsd) throws IOException { - // check whether file exists. - INode inode = iip.getLastINode(); - if (inode == null) { - throw new FileNotFoundException("File/Directory does not exist: " - + iip.getPath()); - } + final INode inode = FSDirectory.resolveLastINode(iip); + final int snapshotId = iip.getLatestSnapshotId(); + final List candidateNodes = new ArrayList<>(); - // TODO: need to check whether inode's storage policy - // has been satisfied or inode exists in the satisfier - // list before calling satisfyStoragePolicy in BlockManager. - if (inode.isDirectory()) { - final int snapshotId = iip.getLatestSnapshotId(); + // TODO: think about optimization here, label the dir instead + // of the sub-files of the dir. + if (inode.isFile()) { + candidateNodes.add(inode); + } else if (inode.isDirectory()) { for (INode node : inode.asDirectory().getChildrenList(snapshotId)) { if (node.isFile()) { - bm.satisfyStoragePolicy(node.getId()); - + candidateNodes.add(node); } } - } else if (inode.isFile()) { - bm.satisfyStoragePolicy(inode.getId()); - } else { - throw new FileNotFoundException("File/Directory does not exist: " - + iip.getPath()); } + + // If node has satisfy xattr, then stop adding it + // to satisfy movement queue. + if (inodeHasSatisfyXAttr(candidateNodes)) { + throw new IOException( + "Cannot request to call satisfy storage policy on path " + + iip.getPath() + + ", as this file/dir was already called for satisfying " + + "storage policy."); + } + + final List xattrs = Lists.newArrayListWithCapacity(1); + final XAttr satisfyXAttr = + XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY); + xattrs.add(satisfyXAttr); + + for (INode node : candidateNodes) { + bm.satisfyStoragePolicy(node.getId()); + List existingXAttrs = XAttrStorage.readINodeXAttrs(node); + List newXAttrs = FSDirXAttrOp.setINodeXAttrs( + fsd, existingXAttrs, xattrs, EnumSet.of(XAttrSetFlag.CREATE)); + XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId); + } + return satisfyXAttr; + } + + private static boolean inodeHasSatisfyXAttr(List candidateNodes) { + // If the node is a directory and one of the child files + // has satisfy xattr, then return true for this directory. + for (INode inode : candidateNodes) { + final XAttrFeature f = inode.getXAttrFeature(); + if (inode.isFile() && + f != null && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) { + return true; + } + } + return false; } private static void setDirStoragePolicy( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java index 9e95f90d866..cd2662d1e54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; class FSDirXAttrOp { private static final XAttr KEYID_XATTR = @@ -294,6 +295,13 @@ static INode unprotectedSetXAttrs( } } + // Add inode id to movement queue if xattrs contain satisfy xattr. + if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) { + FSDirAttrOp.unprotectedSatisfyStoragePolicy(iip, + fsd.getBlockManager(), fsd); + continue; + } + if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) { throw new IOException("Can only set '" + SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' on a file."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 1b1448252ca..1a061056488 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -91,6 +91,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID; /** @@ -1400,10 +1401,23 @@ public final void addToInodeMap(INode inode) { if (!inode.isSymlink()) { final XAttrFeature xaf = inode.getXAttrFeature(); addEncryptionZone((INodeWithAdditionalFields) inode, xaf); + addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); } } } + private void addStoragePolicySatisfier(INodeWithAdditionalFields inode, + XAttrFeature xaf) { + if (xaf == null || inode.isDirectory()) { + return; + } + XAttr xattr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY); + if (xattr == null) { + return; + } + getBlockManager().satisfyStoragePolicy(inode.getId()); + } + private void addEncryptionZone(INodeWithAdditionalFields inode, XAttrFeature xaf) { if (xaf == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index a7fb2131f80..02e47af3846 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2233,7 +2233,8 @@ void setStoragePolicy(String src, String policyName) throws IOException { * * @param src file/directory path */ - void satisfyStoragePolicy(String src) throws IOException { + void satisfyStoragePolicy(String src, boolean logRetryCache) + throws IOException { checkOperation(OperationCategory.WRITE); writeLock(); try { @@ -2255,8 +2256,7 @@ void satisfyStoragePolicy(String src) throws IOException { + " by admin. Seek for an admin help to activate it " + "or use Mover tool."); } - // TODO: need to update editlog for persistence. - FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src); + FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src, logRetryCache); } finally { writeUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b6c590827f7..d3c5cb115b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1408,7 +1408,18 @@ public QuotaUsage getQuotaUsage(String path) throws IOException { @Override // ClientProtocol public void satisfyStoragePolicy(String src) throws IOException { checkNNStartup(); - namesystem.satisfyStoragePolicy(src); + namesystem.checkOperation(OperationCategory.WRITE); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + boolean success = false; + try { + namesystem.satisfyStoragePolicy(src, cacheEntry != null); + success = true; + } finally { + RetryCache.setState(cacheEntry, success); + } } @Override // ClientProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index b1b14648cf4..3b19833c363 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -168,16 +168,18 @@ public void run() { } while (namesystem.isRunning() && isRunning) { try { - Long blockCollectionID = storageMovementNeeded.get(); - if (blockCollectionID != null) { - BlockCollection blockCollection = - namesystem.getBlockCollection(blockCollectionID); - // Check blockCollectionId existence. - if (blockCollection != null) { - boolean allBlockLocsAttemptedToSatisfy = - computeAndAssignStorageMismatchedBlocksToDNs(blockCollection); - this.storageMovementsMonitor.add(blockCollectionID, - allBlockLocsAttemptedToSatisfy); + if (!namesystem.isInSafeMode()) { + Long blockCollectionID = storageMovementNeeded.get(); + if (blockCollectionID != null) { + BlockCollection blockCollection = + namesystem.getBlockCollection(blockCollectionID); + // Check blockCollectionId existence. + if (blockCollection != null) { + boolean allBlockLocsAttemptedToSatisfy = + computeAndAssignStorageMismatchedBlocksToDNs(blockCollection); + this.storageMovementsMonitor + .add(blockCollectionID, allBlockLocsAttemptedToSatisfy); + } } } // TODO: We can think to make this as configurable later, how frequently diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index e6a2a002527..afe351810ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -2419,4 +2419,41 @@ public static void verifySnapshotDiffReport(DistributedFileSystem fs, } } } + + /** + * Check whether the Block movement has been successfully + * completed to satisfy the storage policy for the given file. + * @param fileName file name. + * @param expectedStorageType storage type. + * @param expectedStorageCount expected storage type. + * @param timeout timeout. + * @param fs distributedFileSystem. + * @throws Exception + */ + public static void waitExpectedStorageType(String fileName, + final StorageType expectedStorageType, int expectedStorageCount, + int timeout, DistributedFileSystem fs) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + final LocatedBlock lb; + try { + lb = fs.getClient().getLocatedBlocks(fileName, 0).get(0); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + int actualStorageCount = 0; + for(StorageType type : lb.getStorageTypes()) { + if (expectedStorageType == type) { + actualStorageCount++; + } + } + LOG.info( + expectedStorageType + " replica count, expected=" + + expectedStorageCount + " and actual=" + actualStorageCount); + return expectedStorageCount == actualStorageCount; + } + }, 1000, timeout); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java new file mode 100644 index 00000000000..e4b4290d92f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test persistence of satisfying files/directories. + */ +public class TestPersistentStoragePolicySatisfier { + + private static Configuration conf; + + private static MiniDFSCluster cluster; + private static DistributedFileSystem fs; + + private static Path testFile = + new Path("/testFile"); + private static String testFileName = testFile.toString(); + + private static Path parentDir = new Path("/parentDir"); + private static Path parentFile = new Path(parentDir, "parentFile"); + private static String parentFileName = parentFile.toString(); + private static Path childDir = new Path(parentDir, "childDir"); + private static Path childFile = new Path(childDir, "childFile"); + private static String childFileName = childFile.toString(); + + private static final String COLD = "COLD"; + private static final String WARM = "WARM"; + private static final String ONE_SSD = "ONE_SSD"; + private static final String ALL_SSD = "ALL_SSD"; + + private static StorageType[][] storageTypes = new StorageType[][] { + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.RAM_DISK}, + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.SSD} + }; + + private final int timeout = 300000; + + /** + * Setup environment for every test case. + * @throws IOException + */ + public void clusterSetUp() throws Exception { + clusterSetUp(false); + } + + /** + * Setup cluster environment. + * @param isHAEnabled if true, enable simple HA. + * @throws IOException + */ + private void clusterSetUp(boolean isHAEnabled) throws Exception { + conf = new HdfsConfiguration(); + final int dnNumber = storageTypes.length; + final short replication = 3; + MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf) + .storageTypes(storageTypes) + .numDataNodes(dnNumber); + if (isHAEnabled) { + clusterBuilder.nnTopology(MiniDFSNNTopology.simpleHATopology()); + } + cluster = clusterBuilder.build(); + cluster.waitActive(); + if (isHAEnabled) { + cluster.transitionToActive(0); + fs = HATestUtil.configureFailoverFs(cluster, conf); + } else { + fs = cluster.getFileSystem(); + } + + createTestFiles(fs, replication); + } + + /** + * Setup test files for testing. + * @param dfs + * @param replication + * @throws Exception + */ + private void createTestFiles(DistributedFileSystem dfs, + short replication) throws Exception { + DFSTestUtil.createFile(dfs, testFile, 1024L, replication, 0L); + DFSTestUtil.createFile(dfs, parentFile, 1024L, replication, 0L); + DFSTestUtil.createFile(dfs, childFile, 1024L, replication, 0L); + + DFSTestUtil.waitReplication(dfs, testFile, replication); + DFSTestUtil.waitReplication(dfs, parentFile, replication); + DFSTestUtil.waitReplication(dfs, childFile, replication); + } + + /** + * Tear down environment for every test case. + * @throws IOException + */ + private void clusterShutdown() throws IOException{ + if(fs != null) { + fs.close(); + fs = null; + } + if(cluster != null) { + cluster.shutdown(true); + cluster = null; + } + } + + /** + * While satisfying file/directory, trigger the cluster's checkpoint to + * make sure satisfier persistence work as expected. This test case runs + * as below: + * 1. use satisfyStoragePolicy and add xAttr to the file. + * 2. do the checkpoint by secondary NameNode. + * 3. restart the cluster immediately. + * 4. make sure all the storage policies are satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithCheckpoint() throws Exception { + try { + clusterSetUp(); + fs.setStoragePolicy(testFile, WARM); + fs.satisfyStoragePolicy(testFile); + + // Start the checkpoint. + conf.set( + DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + SecondaryNameNode secondary = new SecondaryNameNode(conf); + secondary.doCheckpoint(); + restartCluster(); + + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.ARCHIVE, 2, timeout, fs); + + fs.setStoragePolicy(parentDir, COLD); + fs.satisfyStoragePolicy(parentDir); + + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.ARCHIVE, 3, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.DEFAULT, 3, timeout, fs); + + } finally { + clusterShutdown(); + } + } + + /** + * Tests to verify satisfier persistence working as expected + * in HA env. This test case runs as below: + * 1. setup HA cluster env with simple HA topology. + * 2. switch the active NameNode from nn0/nn1 to nn1/nn0. + * 3. make sure all the storage policies are satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithHA() throws Exception { + try { + // Enable HA env for testing. + clusterSetUp(true); + + fs.setStoragePolicy(testFile, ALL_SSD); + fs.satisfyStoragePolicy(testFile); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.SSD, 3, timeout, fs); + + // test directory + fs.setStoragePolicy(parentDir, WARM); + fs.satisfyStoragePolicy(parentDir); + cluster.transitionToStandby(1); + cluster.transitionToActive(0); + + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.ARCHIVE, 2, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.DEFAULT, 3, timeout, fs); + } finally { + clusterShutdown(); + } + } + + + /** + * Tests to verify satisfier persistence working well with multiple + * restarts operations. This test case runs as below: + * 1. satisfy the storage policy of file1. + * 2. restart the cluster. + * 3. check whether all the blocks are satisfied. + * 4. satisfy the storage policy of file2. + * 5. restart the cluster. + * 6. check whether all the blocks are satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithRestarts() throws Exception { + try { + clusterSetUp(); + fs.setStoragePolicy(testFile, ONE_SSD); + fs.satisfyStoragePolicy(testFile); + restartCluster(); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.SSD, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.DISK, 2, timeout, fs); + + // test directory + fs.setStoragePolicy(parentDir, COLD); + fs.satisfyStoragePolicy(parentDir); + restartCluster(); + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.ARCHIVE, 3, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.DEFAULT, 3, timeout, fs); + } finally { + clusterShutdown(); + } + } + + /** + * Tests to verify satisfier persistence working well with + * federal HA env. This test case runs as below: + * 1. setup HA test environment with federal topology. + * 2. satisfy storage policy of file1. + * 3. switch active NameNode from nn0 to nn1. + * 4. switch active NameNode from nn2 to nn3. + * 5. check whether the storage policy of file1 is satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithFederationHA() throws Exception { + try { + conf = new HdfsConfiguration(); + final MiniDFSCluster haCluster = new MiniDFSCluster + .Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2)) + .storageTypes(storageTypes) + .numDataNodes(storageTypes.length).build(); + haCluster.waitActive(); + haCluster.transitionToActive(1); + haCluster.transitionToActive(3); + + fs = HATestUtil.configureFailoverFs(haCluster, conf); + createTestFiles(fs, (short) 3); + + fs.setStoragePolicy(testFile, WARM); + fs.satisfyStoragePolicy(testFile); + + haCluster.transitionToStandby(1); + haCluster.transitionToActive(0); + haCluster.transitionToStandby(3); + haCluster.transitionToActive(2); + + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.ARCHIVE, 2, timeout, fs); + + } finally { + clusterShutdown(); + } + } + + /** + * Restart the hole env and trigger the DataNode's heart beats. + * @throws Exception + */ + private void restartCluster() throws Exception { + cluster.restartDataNodes(); + cluster.restartNameNodes(); + cluster.waitActive(); + cluster.triggerHeartbeats(); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 9abb78d1859..1c53894908c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -108,7 +108,8 @@ public void testWhenStoragePolicySetToCOLD() hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -137,7 +138,8 @@ public void testWhenStoragePolicySetToALLSSD() hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(file, StorageType.SSD, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -164,8 +166,10 @@ public void testWhenStoragePolicySetToONESSD() hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(file, StorageType.SSD, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); } finally { shutdownCluster(); } @@ -195,8 +199,10 @@ public void testPerTrackIdBlocksStorageMovementResults() throws Exception { hdfsCluster.triggerHeartbeats(); // Wait till the block is moved to SSD areas - waitExpectedStorageType(file, StorageType.SSD, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); waitForBlocksMovementResult(1, 30000); } finally { @@ -245,8 +251,10 @@ public void testMultipleFilesForSatisfyStoragePolicy() throws Exception { for (String fileName : files) { // Wait till the block is moved to SSD areas - waitExpectedStorageType(fileName, StorageType.SSD, 1, 30000); - waitExpectedStorageType(fileName, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + fileName, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + fileName, StorageType.DISK, 2, 30000, dfs); } waitForBlocksMovementResult(blockCollectionIds.size(), 30000); @@ -279,7 +287,8 @@ public void testSatisfyFileWithHdfsAdmin() throws Exception { hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -317,11 +326,14 @@ public void testSatisfyDirWithHdfsAdmin() throws Exception { hdfsCluster.triggerHeartbeats(); // take effect for the file in the directory. - waitExpectedStorageType(subFile1, StorageType.SSD, 1, 30000); - waitExpectedStorageType(subFile1, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + subFile1, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + subFile1, StorageType.DISK, 2, 30000, dfs); // take no effect for the sub-dir's file in the directory. - waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + subFile2, StorageType.DEFAULT, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -367,6 +379,20 @@ public void testSatisfyWithExceptions() throws Exception { } catch (FileNotFoundException e) { } + + try { + hdfsAdmin.satisfyStoragePolicy(new Path(file)); + hdfsAdmin.satisfyStoragePolicy(new Path(file)); + Assert.fail(String.format( + "Should failed to satisfy storage policy " + + "for %s ,since it has been " + + "added to satisfy movement queue.", file)); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + String.format("Cannot request to call satisfy storage policy " + + "on path %s, as this file/dir was already called for " + + "satisfying storage policy.", file), e); + } } finally { shutdownCluster(); } @@ -407,8 +433,10 @@ public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy() hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); waitForBlocksMovementResult(1, 30000); } finally { @@ -451,7 +479,8 @@ public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() // No block movement will be scheduled as there is no target node available // with the required storage type. waitForAttemptedItems(1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 3, 30000, dfs); // Since there is no target node the item will get timed out and then // re-attempted. waitForAttemptedItems(1, 30000); @@ -523,8 +552,10 @@ public void testMoveWithBlockPinning() throws Exception { // with the required storage type. waitForAttemptedItems(1, 30000); waitForBlocksMovementResult(1, 30000); - waitExpectedStorageType(file1, StorageType.ARCHIVE, 1, 30000); - waitExpectedStorageType(file1, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file1, StorageType.ARCHIVE, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file1, StorageType.DISK, 2, 30000, dfs); } /** @@ -571,8 +602,10 @@ public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes() hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000); - waitExpectedStorageType(file, StorageType.DISK, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 3, 30000, dfs); waitForBlocksMovementResult(1, 30000); } finally { @@ -606,8 +639,10 @@ public void testBlockMoveInSameDatanodeWithONESSD() throws Exception { namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); hdfsCluster.triggerHeartbeats(); - waitExpectedStorageType(file, StorageType.SSD, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); } finally { shutdownCluster(); @@ -644,8 +679,10 @@ public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception { namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); hdfsCluster.triggerHeartbeats(); - waitExpectedStorageType(file, StorageType.DISK, 1, 30000); - waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 2, 30000, dfs); } finally { shutdownCluster(); } @@ -771,33 +808,4 @@ private MiniDFSCluster startCluster(final Configuration conf, cluster.waitActive(); return cluster; } - - // Check whether the Block movement has been successfully completed to satisfy - // the storage policy for the given file. - private void waitExpectedStorageType(final String fileName, - final StorageType expectedStorageType, int expectedStorageCount, - int timeout) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - LocatedBlock lb = null; - try { - lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0); - } catch (IOException e) { - LOG.error("Exception while getting located blocks", e); - return false; - } - int actualStorageCount = 0; - for (StorageType storageType : lb.getStorageTypes()) { - if (expectedStorageType == storageType) { - actualStorageCount++; - } - } - LOG.info( - expectedStorageType + " replica count, expected={} and actual={}", - expectedStorageType, actualStorageCount); - return expectedStorageCount == actualStorageCount; - } - }, 100, timeout); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java index 5f8639f7295..eb4a6a38ed9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java @@ -66,7 +66,7 @@ public class TestStoragePolicySatisfierWithStripedFile { private int defaultStripeBlockSize; private ErasureCodingPolicy getEcPolicy() { - return ErasureCodingPolicyManager.getSystemDefaultPolicy(); + return StripedFileTestUtil.getDefaultECPolicy(); } /** @@ -99,6 +99,8 @@ public void testMoverWithFullStripe() throws Exception { } final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + StripedFileTestUtil.getDefaultECPolicy().getName()); initConfWithStripe(conf, defaultStripeBlockSize); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numOfDatanodes) @@ -128,7 +130,8 @@ public void testMoverWithFullStripe() throws Exception { client.mkdirs(barDir, new FsPermission((short) 777), true); client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); // set an EC policy on "/bar" directory - client.setErasureCodingPolicy(barDir, null); + client.setErasureCodingPolicy(barDir, + StripedFileTestUtil.getDefaultECPolicy().getName()); // write file to barDir final String fooFile = "/bar/foo"; @@ -206,6 +209,8 @@ public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy() } final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + StripedFileTestUtil.getDefaultECPolicy().getName()); initConfWithStripe(conf, defaultStripeBlockSize); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numOfDatanodes) @@ -235,7 +240,8 @@ public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy() client.mkdirs(barDir, new FsPermission((short) 777), true); client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); // set an EC policy on "/bar" directory - client.setErasureCodingPolicy(barDir, null); + client.setErasureCodingPolicy(barDir, + StripedFileTestUtil.getDefaultECPolicy().getName()); // write file to barDir final String fooFile = "/bar/foo"; @@ -314,6 +320,8 @@ public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() } final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + StripedFileTestUtil.getDefaultECPolicy().getName()); initConfWithStripe(conf, defaultStripeBlockSize); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numOfDatanodes) @@ -343,7 +351,8 @@ public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() client.mkdirs(barDir, new FsPermission((short) 777), true); client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); // set an EC policy on "/bar" directory - client.setErasureCodingPolicy(barDir, null); + client.setErasureCodingPolicy(barDir, + StripedFileTestUtil.getDefaultECPolicy().getName()); // write file to barDir final String fooFile = "/bar/foo";