From 6fe6c549e8226b4893f502186f52452dcd9408a2 Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Thu, 20 Apr 2017 23:14:36 +0530 Subject: [PATCH] HDFS-11572. [SPS]: SPS should clean Xattrs when no blocks required to satisfy for a file. Contributed by Uma Maheswara Rao G --- .../BlockStorageMovementAttemptedItems.java | 2 +- .../namenode/StoragePolicySatisfier.java | 116 +++++++++++++----- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 35 ++++++ .../TestPersistentStoragePolicySatisfier.java | 52 ++++---- .../namenode/TestStoragePolicySatisfier.java | 76 ++++++++++++ 5 files changed, 225 insertions(+), 56 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index f2406dae4fa..bf7859cb4d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -333,7 +333,7 @@ public class BlockStorageMovementAttemptedItems { + "doesn't exists in storageMovementAttemptedItems list", storageMovementAttemptedResult.getTrackId()); // Remove xattr for the track id. - this.sps.notifyBlkStorageMovementFinished( + this.sps.postBlkStorageMovementCleanup( storageMovementAttemptedResult.getTrackId()); } break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 8be0a2ad419..3b203141ccc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -79,6 +79,27 @@ public class StoragePolicySatisfier implements Runnable { private final BlockStorageMovementAttemptedItems storageMovementsMonitor; private volatile boolean isRunning = false; + /** + * Represents the collective analysis status for all blocks. + */ + private enum BlocksMovingAnalysisStatus { + // Represents that, the analysis skipped due to some conditions. A such + // condition is if block collection is in incomplete state. + ANALYSIS_SKIPPED_FOR_RETRY, + // Represents that, all block storage movement needed blocks found its + // targets. + ALL_BLOCKS_TARGETS_PAIRED, + // Represents that, only fewer or none of the block storage movement needed + // block found its eligible targets. + FEW_BLOCKS_TARGETS_PAIRED, + // Represents that, none of the blocks found for block storage movements. + BLOCKS_ALREADY_SATISFIED, + // Represents that, the analysis skipped due to some conditions. + // Example conditions are if no blocks really exists in block collection or + // if analysis is not required on ec files with unsuitable storage policies + BLOCKS_TARGET_PAIRING_SKIPPED; + } + public StoragePolicySatisfier(final Namesystem namesystem, final BlockStorageMovementNeeded storageMovementNeeded, final BlockManager blkManager, Configuration conf) { @@ -208,10 +229,31 @@ public class StoragePolicySatisfier implements Runnable { namesystem.getBlockCollection(blockCollectionID); // Check blockCollectionId existence. if (blockCollection != null) { - boolean allBlockLocsAttemptedToSatisfy = - computeAndAssignStorageMismatchedBlocksToDNs(blockCollection); - this.storageMovementsMonitor - .add(blockCollectionID, allBlockLocsAttemptedToSatisfy); + BlocksMovingAnalysisStatus status = + analyseBlocksStorageMovementsAndAssignToDN(blockCollection); + switch (status) { + // Just add to monitor, so it will be retried after timeout + case ANALYSIS_SKIPPED_FOR_RETRY: + // Just add to monitor, so it will be tracked for result and + // be removed on successful storage movement result. + case ALL_BLOCKS_TARGETS_PAIRED: + this.storageMovementsMonitor.add(blockCollectionID, true); + break; + // Add to monitor with allBlcoksAttemptedToSatisfy flag false, so + // that it will be tracked and still it will be consider for retry + // as analysis was not found targets for storage movement blocks. + case FEW_BLOCKS_TARGETS_PAIRED: + this.storageMovementsMonitor.add(blockCollectionID, false); + break; + // Just clean Xattrs + case BLOCKS_TARGET_PAIRING_SKIPPED: + case BLOCKS_ALREADY_SATISFIED: + default: + LOG.info("Block analysis skipped or blocks already satisfied" + + " with storages. So, Cleaning up the Xattrs."); + postBlkStorageMovementCleanup(blockCollectionID); + break; + } } } } @@ -235,15 +277,15 @@ public class StoragePolicySatisfier implements Runnable { } LOG.error("StoragePolicySatisfier thread received runtime exception. " + "Stopping Storage policy satisfier work", t); - // TODO: Just break for now. Once we implement dynamic start/stop - // option, we can add conditions here when to break/terminate. break; } } } - private boolean computeAndAssignStorageMismatchedBlocksToDNs( + private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN( BlockCollection blockCollection) { + BlocksMovingAnalysisStatus status = + BlocksMovingAnalysisStatus.BLOCKS_ALREADY_SATISFIED; byte existingStoragePolicyID = blockCollection.getStoragePolicyID(); BlockStoragePolicy existingStoragePolicy = blockManager.getStoragePolicy(existingStoragePolicyID); @@ -252,21 +294,20 @@ public class StoragePolicySatisfier implements Runnable { // So, should we add back? or leave it to user LOG.info("BlockCollectionID: {} file is under construction. So, postpone" + " this to the next retry iteration", blockCollection.getId()); - return true; + return BlocksMovingAnalysisStatus.ANALYSIS_SKIPPED_FOR_RETRY; } // First datanode will be chosen as the co-ordinator node for storage // movements. Later this can be optimized if needed. DatanodeDescriptor coordinatorNode = null; BlockInfo[] blocks = blockCollection.getBlocks(); + if (blocks.length == 0) { + LOG.info("BlockCollectionID: {} file is not having any blocks." + + " So, skipping the analysis.", blockCollection.getId()); + return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED; + } List blockMovingInfos = new ArrayList(); - // True value represents that, SPS is able to find matching target nodes - // to satisfy storage type for all the blocks locations of the given - // blockCollection. A false value represents that, blockCollection needed - // retries to satisfy the storage policy for some of the block locations. - boolean foundMatchingTargetNodesForAllBlocks = true; - for (int i = 0; i < blocks.length; i++) { BlockInfo blockInfo = blocks[i]; List expectedStorageTypes; @@ -283,19 +324,38 @@ public class StoragePolicySatisfier implements Runnable { LOG.warn("The storage policy " + existingStoragePolicy.getName() + " is not suitable for Striped EC files. " + "So, ignoring to move the blocks"); - return false; + return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED; } } else { expectedStorageTypes = existingStoragePolicy .chooseStorageTypes(blockInfo.getReplication()); } - foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos( - blockMovingInfos, blockInfo, expectedStorageTypes); + + DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); + StorageType[] storageTypes = new StorageType[storages.length]; + for (int j = 0; j < storages.length; j++) { + DatanodeStorageInfo datanodeStorageInfo = storages[j]; + StorageType storageType = datanodeStorageInfo.getStorageType(); + storageTypes[j] = storageType; + } + List existing = + new LinkedList(Arrays.asList(storageTypes)); + if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + existing, true)) { + boolean computeStatus = computeBlockMovingInfos(blockMovingInfos, + blockInfo, expectedStorageTypes, existing, storages); + if (computeStatus + && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED) { + status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED; + } else { + status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED; + } + } } assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(), blockMovingInfos, coordinatorNode); - return foundMatchingTargetNodesForAllBlocks; + return status; } /** @@ -311,22 +371,18 @@ public class StoragePolicySatisfier implements Runnable { * - block details * @param expectedStorageTypes * - list of expected storage type to satisfy the storage policy + * @param existing + * - list to get existing storage types + * @param storages + * - available storages * @return false if some of the block locations failed to find target node to * satisfy the storage policy, true otherwise */ private boolean computeBlockMovingInfos( List blockMovingInfos, BlockInfo blockInfo, - List expectedStorageTypes) { + List expectedStorageTypes, List existing, + DatanodeStorageInfo[] storages) { boolean foundMatchingTargetNodesForBlock = true; - DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); - StorageType[] storageTypes = new StorageType[storages.length]; - for (int j = 0; j < storages.length; j++) { - DatanodeStorageInfo datanodeStorageInfo = storages[j]; - StorageType storageType = datanodeStorageInfo.getStorageType(); - storageTypes[j] = storageType; - } - List existing = - new LinkedList(Arrays.asList(storageTypes)); if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, existing, true)) { List sourceWithStorageMap = @@ -756,7 +812,7 @@ public class StoragePolicySatisfier implements Runnable { Long id; while ((id = storageMovementNeeded.get()) != null) { try { - notifyBlkStorageMovementFinished(id); + postBlkStorageMovementCleanup(id); } catch (IOException ie) { LOG.warn("Failed to remove SPS " + "xattr for collection id " + id, ie); @@ -771,7 +827,7 @@ public class StoragePolicySatisfier implements Runnable { * @param trackId track id i.e., block collection id. * @throws IOException */ - public void notifyBlkStorageMovementFinished(long trackId) + public void postBlkStorageMovementCleanup(long trackId) throws IOException { this.namesystem.getFSDirectory().removeSPSXattr(trackId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 8c4107a2778..bab37e45fea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -97,8 +97,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntryScope; import org.apache.hadoop.fs.permission.AclEntryType; @@ -155,9 +158,12 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.XAttrStorage; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -175,6 +181,7 @@ import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.UserGroupInformation; @@ -2456,4 +2463,32 @@ public class DFSTestUtil { } }, 500, timeout); } + + /** + * Waits for removal of a specified Xattr on a specified file. + * + * @param srcPath + * file name. + * @param xattr + * name of the extended attribute. + * @param ns + * Namesystem + * @param timeout + * max wait time + * @throws Exception + */ + public static void waitForXattrRemoved(String srcPath, String xattr, + Namesystem ns, int timeout) throws TimeoutException, InterruptedException, + UnresolvedLinkException, AccessControlException, + ParentNotDirectoryException { + final INode inode = ns.getFSDirectory().getINode(srcPath); + final XAttr satisfyXAttr = XAttrHelper.buildXAttr(xattr); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); + return !existingXAttrs.contains(satisfyXAttr); + } + }, 100, timeout); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java index 8c3359a9d34..41c272c6ad9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -20,22 +20,18 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.junit.Test; import java.io.IOException; -import java.util.List; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; -import static org.junit.Assert.assertFalse; /** * Test persistence of satisfying files/directories. @@ -341,15 +337,9 @@ public class TestPersistentStoragePolicySatisfier { DFSTestUtil.waitExpectedStorageType( testFileName, StorageType.DISK, 2, timeout, fs); - // Make sure that SPS xattr has been removed. - int retryTime = 0; - while (retryTime < 30) { - if (!fileContainsSPSXAttr(testFile)) { - break; - } - Thread.sleep(minCheckTimeout); - retryTime += 1; - } + // Make sure satisfy xattr has been removed. + DFSTestUtil.waitForXattrRemoved(testFileName, + XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); fs.setStoragePolicy(testFile, COLD); fs.satisfyStoragePolicy(testFile); @@ -379,7 +369,8 @@ public class TestPersistentStoragePolicySatisfier { cluster.getNamesystem().getBlockManager().deactivateSPS(); // Make sure satisfy xattr has been removed. - assertFalse(fileContainsSPSXAttr(testFile)); + DFSTestUtil.waitForXattrRemoved(testFileName, + XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); } finally { clusterShutdown(); @@ -387,18 +378,29 @@ public class TestPersistentStoragePolicySatisfier { } /** - * Check whether file contains SPS xattr. - * @param fileName file name. - * @return true if file contains SPS xattr. - * @throws IOException + * Tests that Xattrs should be cleaned if all blocks already satisfied. + * + * @throws Exception */ - private boolean fileContainsSPSXAttr(Path fileName) throws IOException { - final INode inode = cluster.getNamesystem() - .getFSDirectory().getINode(fileName.toString()); - final XAttr satisfyXAttr = - XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY); - List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); - return existingXAttrs.contains(satisfyXAttr); + @Test(timeout = 300000) + public void testSPSShouldNotLeakXattrIfStorageAlreadySatisfied() + throws Exception { + try { + clusterSetUp(); + DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3, + timeout, fs); + fs.satisfyStoragePolicy(testFile); + + DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3, + timeout, fs); + + // Make sure satisfy xattr has been removed. + DFSTestUtil.waitForXattrRemoved(testFileName, + XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); + + } finally { + clusterShutdown(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 2a3345564d1..8457e5b043d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.junit.Assert.assertNull; import java.io.FileNotFoundException; @@ -34,13 +35,17 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -828,6 +833,77 @@ public class TestStoragePolicySatisfier { } } + /** + * Tests that Xattrs should be cleaned if satisfy storage policy called on EC + * file with unsuitable storage policy set. + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles() + throws Exception { + StorageType[][] diskTypes = + new StorageType[][]{{StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.DISK, StorageType.SSD}}; + + int defaultStripedBlockSize = + ErasureCodingPolicyManager.getSystemPolicies()[0].getCellSize() * 4; + config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize); + config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1L); + config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, + false); + + try { + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + + // set "/foo" directory with ONE_SSD storage policy. + ClientProtocol client = NameNodeProxies.createProxy(config, + hdfsCluster.getFileSystem(0).getUri(), ClientProtocol.class) + .getProxy(); + String fooDir = "/foo"; + client.mkdirs(fooDir, new FsPermission((short) 777), true); + // set an EC policy on "/foo" directory + client.setErasureCodingPolicy(fooDir, null); + + // write file to fooDir + final String testFile = "/foo/bar"; + long fileLen = 20 * defaultStripedBlockSize; + dfs = hdfsCluster.getFileSystem(); + DFSTestUtil.createFile(dfs, new Path(testFile), fileLen, (short) 3, 0); + + // ONESSD is unsuitable storage policy on EC files + client.setStoragePolicy(fooDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); + dfs.satisfyStoragePolicy(new Path(testFile)); + + // Thread.sleep(9000); // To make sure SPS triggered + // verify storage types and locations + LocatedBlocks locatedBlocks = + client.getBlockLocations(testFile, 0, fileLen); + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + for (StorageType type : lb.getStorageTypes()) { + Assert.assertEquals(StorageType.DISK, type); + } + } + + // Make sure satisfy xattr has been removed. + DFSTestUtil.waitForXattrRemoved(testFile, XATTR_SATISFY_STORAGE_POLICY, + hdfsCluster.getNamesystem(), 30000); + } finally { + shutdownCluster(); + } + } + private String createFileAndSimulateFavoredNodes(int favoredNodesCount) throws IOException { ArrayList dns = hdfsCluster.getDataNodes();