From 551024915d487957d9e829493ab319c8e31dfa81 Mon Sep 17 00:00:00 2001 From: Daryn Sharp Date: Fri, 18 Jul 2014 17:58:07 +0000 Subject: [PATCH] HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611737 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/protocol/DatanodeInfo.java | 2 +- .../BlockPlacementPolicyDefault.java | 14 +- .../blockmanagement/DatanodeManager.java | 4 +- .../blockmanagement/DatanodeStatistics.java | 6 + .../blockmanagement/HeartbeatManager.java | 21 +- .../hdfs/server/namenode/FSClusterStats.java | 9 + .../hdfs/server/namenode/FSNamesystem.java | 13 +- .../TestReplicationPolicyConsiderLoad.java | 24 ++- .../namenode/TestNamenodeCapacityReport.java | 186 +++++++++++++++++- 10 files changed, 259 insertions(+), 22 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6c815fc30a0..9433d62a8ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -595,6 +595,8 @@ Release 2.5.0 - UNRELEASED HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9) + HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn) + BUG FIXES HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index cb0f081b99f..9fcada734ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -339,7 +339,7 @@ public class DatanodeInfo extends DatanodeID implements Node { buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n"); buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n"); buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n"); - + buffer.append("Xceivers: "+getXceiverCount()+"\n"); buffer.append("Last contact: "+new Date(lastUpdate)+"\n"); return buffer.toString(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 35333363ece..accdddf5250 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -636,15 +636,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // check the communication traffic of the target machine if (considerLoad) { - double avgLoad = 0; - if (stats != null) { - int size = stats.getNumDatanodesInService(); - if (size != 0) { - avgLoad = (double)stats.getTotalLoad()/size; - } - } - if (node.getXceiverCount() > (2.0 * avgLoad)) { - logNodeIsNotChosen(storage, "the node is too busy "); + final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); + final int nodeLoad = node.getXceiverCount(); + if (nodeLoad > maxLoad) { + logNodeIsNotChosen(storage, + "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") "); return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index aea34ecbc90..791c6dff5be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -820,7 +820,9 @@ public class DatanodeManager { } /** Start decommissioning the specified datanode. */ - private void startDecommission(DatanodeDescriptor node) { + @InterfaceAudience.Private + @VisibleForTesting + public void startDecommission(DatanodeDescriptor node) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) { LOG.info("Start Decommissioning " + node + " " + storage diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java index 9ccc5b1ff4a..c9bc3e5ea67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java @@ -52,6 +52,12 @@ public interface DatanodeStatistics { /** @return the xceiver count */ public int getXceiverCount(); + /** @return average xceiver count for non-decommission(ing|ed) nodes */ + public int getInServiceXceiverCount(); + + /** @return number of non-decommission(ing|ed) nodes */ + public int getNumDatanodesInService(); + /** * @return the total used space by data nodes for non-DFS purposes * such as storing temporary files on the local file system diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index e7017a30a93..901f7e3653d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -150,6 +150,16 @@ class HeartbeatManager implements DatanodeStatistics { return stats.xceiverCount; } + @Override + public synchronized int getInServiceXceiverCount() { + return stats.nodesInServiceXceiverCount; + } + + @Override + public synchronized int getNumDatanodesInService() { + return stats.nodesInService; + } + @Override public synchronized long getCacheCapacity() { return stats.cacheCapacity; @@ -178,7 +188,7 @@ class HeartbeatManager implements DatanodeStatistics { } synchronized void register(final DatanodeDescriptor d) { - if (!datanodes.contains(d)) { + if (!d.isAlive) { addDatanode(d); //update its timestamp @@ -191,6 +201,8 @@ class HeartbeatManager implements DatanodeStatistics { } synchronized void addDatanode(final DatanodeDescriptor d) { + // update in-service node count + stats.add(d); datanodes.add(d); d.isAlive = true; } @@ -323,6 +335,9 @@ class HeartbeatManager implements DatanodeStatistics { private long cacheCapacity = 0L; private long cacheUsed = 0L; + private int nodesInService = 0; + private int nodesInServiceXceiverCount = 0; + private int expiredHeartbeats = 0; private void add(final DatanodeDescriptor node) { @@ -330,6 +345,8 @@ class HeartbeatManager implements DatanodeStatistics { blockPoolUsed += node.getBlockPoolUsed(); xceiverCount += node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService++; + nodesInServiceXceiverCount += node.getXceiverCount(); capacityTotal += node.getCapacity(); capacityRemaining += node.getRemaining(); } else { @@ -344,6 +361,8 @@ class HeartbeatManager implements DatanodeStatistics { blockPoolUsed -= node.getBlockPoolUsed(); xceiverCount -= node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService--; + nodesInServiceXceiverCount -= node.getXceiverCount(); capacityTotal -= node.getCapacity(); capacityRemaining -= node.getRemaining(); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java index 676aa0826c0..1a859a7b93e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java @@ -48,6 +48,15 @@ public interface FSClusterStats { * @return Number of datanodes that are both alive and not decommissioned. */ public int getNumDatanodesInService(); + + /** + * an indication of the average load of non-decommission(ing|ed) nodes + * eligible for block placement + * + * @return average of the in service number of block transfers and block + * writes that are currently occurring on the cluster. + */ + public double getInServiceXceiverAverage(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 81db6786dd4..7e0394a022f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7320,7 +7320,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats, @Override // FSClusterStats public int getNumDatanodesInService() { - return getNumLiveDataNodes() - getNumDecomLiveDataNodes(); + return datanodeStatistics.getNumDatanodesInService(); + } + + @Override // for block placement strategy + public double getInServiceXceiverAverage() { + double avgLoad = 0; + final int nodes = getNumDatanodesInService(); + if (nodes != 0) { + final int xceivers = datanodeStatistics.getInServiceXceiverCount(); + avgLoad = (double)xceivers/nodes; + } + return avgLoad; } public SnapshotManager getSnapshotManager() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index 73d397760a4..bf972c03ca7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.test.PathUtils; @@ -101,6 +102,7 @@ public class TestReplicationPolicyConsiderLoad { } } + private final double EPSILON = 0.0001; /** * Tests that chooseTarget with considerLoad set to true correctly calculates * load with decommissioned nodes. @@ -109,14 +111,6 @@ public class TestReplicationPolicyConsiderLoad { public void testChooseTargetWithDecomNodes() throws IOException { namenode.getNamesystem().writeLock(); try { - // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget() - // returns false - for (int i = 0; i < 3; i++) { - DatanodeInfo d = dnManager.getDatanodeByXferAddr( - dnrList.get(i).getIpAddr(), - dnrList.get(i).getXferPort()); - d.setDecommissioned(); - } String blockPoolId = namenode.getNamesystem().getBlockPoolId(); dnManager.handleHeartbeat(dnrList.get(3), BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]), @@ -133,6 +127,20 @@ public class TestReplicationPolicyConsiderLoad { blockPoolId, dataNodes[5].getCacheCapacity(), dataNodes[5].getCacheRemaining(), 4, 0, 0); + // value in the above heartbeats + final int load = 2 + 4 + 4; + + FSNamesystem fsn = namenode.getNamesystem(); + assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON); + + // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget() + // returns false + for (int i = 0; i < 3; i++) { + DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i)); + dnManager.startDecommission(d); + d.setDecommissioned(); + } + assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON); // Call chooseTarget() DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java index 2883f99f692..48a4b3c6c76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; +import static org.junit.Assert.*; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -28,12 +30,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.Test; @@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport { if (cluster != null) {cluster.shutdown();} } } + + private static final float EPSILON = 0.0001f; + @Test + public void testXceiverCount() throws Exception { + Configuration conf = new HdfsConfiguration(); + // don't waste time retrying if close fails + conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0); + MiniDFSCluster cluster = null; + + final int nodes = 8; + final int fileCount = 5; + final short fileRepl = 3; + + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build(); + cluster.waitActive(); + + final FSNamesystem namesystem = cluster.getNamesystem(); + final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager(); + List datanodes = cluster.getDataNodes(); + final DistributedFileSystem fs = cluster.getFileSystem(); + + // trigger heartbeats in case not already sent + triggerHeartbeats(datanodes); + + // check that all nodes are live and in service + int expectedTotalLoad = nodes; // xceiver server adds 1 to load + int expectedInServiceNodes = nodes; + int expectedInServiceLoad = nodes; + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceLoad, + namesystem.getInServiceXceiverAverage(), EPSILON); + + // shutdown half the nodes and force a heartbeat check to ensure + // counts are accurate + for (int i=0; i < nodes/2; i++) { + DataNode dn = datanodes.get(i); + DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId()); + dn.shutdown(); + dnd.setLastUpdate(0L); + BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager()); + expectedInServiceNodes--; + assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + } + + // restart the nodes to verify that counts are correct after + // node re-registration + cluster.restartDataNodes(); + cluster.waitActive(); + datanodes = cluster.getDataNodes(); + expectedInServiceNodes = nodes; + assertEquals(nodes, datanodes.size()); + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceLoad, + namesystem.getInServiceXceiverAverage(), EPSILON); + + // create streams and hsync to force datastreamers to start + DFSOutputStream[] streams = new DFSOutputStream[fileCount]; + for (int i=0; i < fileCount; i++) { + streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl) + .getWrappedStream(); + streams[i].write("1".getBytes()); + streams[i].hsync(); + // the load for writers is 2 because both the write xceiver & packet + // responder threads are counted in the load + expectedTotalLoad += 2*fileRepl; + expectedInServiceLoad += 2*fileRepl; + } + // force nodes to send load update + triggerHeartbeats(datanodes); + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, + namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceNodes, + namesystem.getInServiceXceiverAverage(), EPSILON); + + // decomm a few nodes, substract their load from the expected load, + // trigger heartbeat to force load update + for (int i=0; i < fileRepl; i++) { + expectedInServiceNodes--; + DatanodeDescriptor dnd = + dnm.getDatanode(datanodes.get(i).getDatanodeId()); + expectedInServiceLoad -= dnd.getXceiverCount(); + dnm.startDecommission(dnd); + DataNodeTestUtils.triggerHeartbeat(datanodes.get(i)); + Thread.sleep(100); + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, + namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceNodes, + namesystem.getInServiceXceiverAverage(), EPSILON); + } + + // check expected load while closing each stream. recalc expected + // load based on whether the nodes in the pipeline are decomm + for (int i=0; i < fileCount; i++) { + int decomm = 0; + for (DatanodeInfo dni : streams[i].getPipeline()) { + DatanodeDescriptor dnd = dnm.getDatanode(dni); + expectedTotalLoad -= 2; + if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) { + decomm++; + } else { + expectedInServiceLoad -= 2; + } + } + try { + streams[i].close(); + } catch (IOException ioe) { + // nodes will go decommissioned even if there's a UC block whose + // other locations are decommissioned too. we'll ignore that + // bug for now + if (decomm < fileRepl) { + throw ioe; + } + } + triggerHeartbeats(datanodes); + // verify node count and loads + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, + namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceNodes, + namesystem.getInServiceXceiverAverage(), EPSILON); + } + + // shutdown each node, verify node counts based on decomm state + for (int i=0; i < nodes; i++) { + DataNode dn = datanodes.get(i); + dn.shutdown(); + // force it to appear dead so live count decreases + DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId()); + dnDesc.setLastUpdate(0L); + BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager()); + assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes()); + // first few nodes are already out of service + if (i >= fileRepl) { + expectedInServiceNodes--; + } + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + + // live nodes always report load of 1. no nodes is load 0 + double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0; + assertEquals((double)expectedXceiverAvg, + namesystem.getInServiceXceiverAverage(), EPSILON); + } + + // final sanity check + assertEquals(0, namesystem.getNumLiveDataNodes()); + assertEquals(0, namesystem.getNumDatanodesInService()); + assertEquals(0.0, namesystem.getTotalLoad(), EPSILON); + assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void triggerHeartbeats(List datanodes) + throws IOException, InterruptedException { + for (DataNode dn : datanodes) { + DataNodeTestUtils.triggerHeartbeat(dn); + } + Thread.sleep(100); + } }