HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
08466eaa00
commit
551024915d
|
@ -595,6 +595,8 @@ Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
|
HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
|
||||||
|
|
||||||
|
HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.
|
HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.
|
||||||
|
|
|
@ -339,7 +339,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
|
buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
|
||||||
buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
|
buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
|
||||||
buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
|
buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
|
||||||
|
buffer.append("Xceivers: "+getXceiverCount()+"\n");
|
||||||
buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
|
buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
|
||||||
return buffer.toString();
|
return buffer.toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -636,15 +636,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
|
|
||||||
// check the communication traffic of the target machine
|
// check the communication traffic of the target machine
|
||||||
if (considerLoad) {
|
if (considerLoad) {
|
||||||
double avgLoad = 0;
|
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
|
||||||
if (stats != null) {
|
final int nodeLoad = node.getXceiverCount();
|
||||||
int size = stats.getNumDatanodesInService();
|
if (nodeLoad > maxLoad) {
|
||||||
if (size != 0) {
|
logNodeIsNotChosen(storage,
|
||||||
avgLoad = (double)stats.getTotalLoad()/size;
|
"the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
|
||||||
}
|
|
||||||
}
|
|
||||||
if (node.getXceiverCount() > (2.0 * avgLoad)) {
|
|
||||||
logNodeIsNotChosen(storage, "the node is too busy ");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -820,7 +820,9 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Start decommissioning the specified datanode. */
|
/** Start decommissioning the specified datanode. */
|
||||||
private void startDecommission(DatanodeDescriptor node) {
|
@InterfaceAudience.Private
|
||||||
|
@VisibleForTesting
|
||||||
|
public void startDecommission(DatanodeDescriptor node) {
|
||||||
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
||||||
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
||||||
LOG.info("Start Decommissioning " + node + " " + storage
|
LOG.info("Start Decommissioning " + node + " " + storage
|
||||||
|
|
|
@ -52,6 +52,12 @@ public interface DatanodeStatistics {
|
||||||
/** @return the xceiver count */
|
/** @return the xceiver count */
|
||||||
public int getXceiverCount();
|
public int getXceiverCount();
|
||||||
|
|
||||||
|
/** @return average xceiver count for non-decommission(ing|ed) nodes */
|
||||||
|
public int getInServiceXceiverCount();
|
||||||
|
|
||||||
|
/** @return number of non-decommission(ing|ed) nodes */
|
||||||
|
public int getNumDatanodesInService();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the total used space by data nodes for non-DFS purposes
|
* @return the total used space by data nodes for non-DFS purposes
|
||||||
* such as storing temporary files on the local file system
|
* such as storing temporary files on the local file system
|
||||||
|
|
|
@ -150,6 +150,16 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
return stats.xceiverCount;
|
return stats.xceiverCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized int getInServiceXceiverCount() {
|
||||||
|
return stats.nodesInServiceXceiverCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized int getNumDatanodesInService() {
|
||||||
|
return stats.nodesInService;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized long getCacheCapacity() {
|
public synchronized long getCacheCapacity() {
|
||||||
return stats.cacheCapacity;
|
return stats.cacheCapacity;
|
||||||
|
@ -178,7 +188,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void register(final DatanodeDescriptor d) {
|
synchronized void register(final DatanodeDescriptor d) {
|
||||||
if (!datanodes.contains(d)) {
|
if (!d.isAlive) {
|
||||||
addDatanode(d);
|
addDatanode(d);
|
||||||
|
|
||||||
//update its timestamp
|
//update its timestamp
|
||||||
|
@ -191,6 +201,8 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void addDatanode(final DatanodeDescriptor d) {
|
synchronized void addDatanode(final DatanodeDescriptor d) {
|
||||||
|
// update in-service node count
|
||||||
|
stats.add(d);
|
||||||
datanodes.add(d);
|
datanodes.add(d);
|
||||||
d.isAlive = true;
|
d.isAlive = true;
|
||||||
}
|
}
|
||||||
|
@ -323,6 +335,9 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
private long cacheCapacity = 0L;
|
private long cacheCapacity = 0L;
|
||||||
private long cacheUsed = 0L;
|
private long cacheUsed = 0L;
|
||||||
|
|
||||||
|
private int nodesInService = 0;
|
||||||
|
private int nodesInServiceXceiverCount = 0;
|
||||||
|
|
||||||
private int expiredHeartbeats = 0;
|
private int expiredHeartbeats = 0;
|
||||||
|
|
||||||
private void add(final DatanodeDescriptor node) {
|
private void add(final DatanodeDescriptor node) {
|
||||||
|
@ -330,6 +345,8 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
blockPoolUsed += node.getBlockPoolUsed();
|
blockPoolUsed += node.getBlockPoolUsed();
|
||||||
xceiverCount += node.getXceiverCount();
|
xceiverCount += node.getXceiverCount();
|
||||||
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
|
nodesInService++;
|
||||||
|
nodesInServiceXceiverCount += node.getXceiverCount();
|
||||||
capacityTotal += node.getCapacity();
|
capacityTotal += node.getCapacity();
|
||||||
capacityRemaining += node.getRemaining();
|
capacityRemaining += node.getRemaining();
|
||||||
} else {
|
} else {
|
||||||
|
@ -344,6 +361,8 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
blockPoolUsed -= node.getBlockPoolUsed();
|
blockPoolUsed -= node.getBlockPoolUsed();
|
||||||
xceiverCount -= node.getXceiverCount();
|
xceiverCount -= node.getXceiverCount();
|
||||||
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
|
nodesInService--;
|
||||||
|
nodesInServiceXceiverCount -= node.getXceiverCount();
|
||||||
capacityTotal -= node.getCapacity();
|
capacityTotal -= node.getCapacity();
|
||||||
capacityRemaining -= node.getRemaining();
|
capacityRemaining -= node.getRemaining();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -48,6 +48,15 @@ public interface FSClusterStats {
|
||||||
* @return Number of datanodes that are both alive and not decommissioned.
|
* @return Number of datanodes that are both alive and not decommissioned.
|
||||||
*/
|
*/
|
||||||
public int getNumDatanodesInService();
|
public int getNumDatanodesInService();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* an indication of the average load of non-decommission(ing|ed) nodes
|
||||||
|
* eligible for block placement
|
||||||
|
*
|
||||||
|
* @return average of the in service number of block transfers and block
|
||||||
|
* writes that are currently occurring on the cluster.
|
||||||
|
*/
|
||||||
|
public double getInServiceXceiverAverage();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7320,7 +7320,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
|
|
||||||
@Override // FSClusterStats
|
@Override // FSClusterStats
|
||||||
public int getNumDatanodesInService() {
|
public int getNumDatanodesInService() {
|
||||||
return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
|
return datanodeStatistics.getNumDatanodesInService();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override // for block placement strategy
|
||||||
|
public double getInServiceXceiverAverage() {
|
||||||
|
double avgLoad = 0;
|
||||||
|
final int nodes = getNumDatanodesInService();
|
||||||
|
if (nodes != 0) {
|
||||||
|
final int xceivers = datanodeStatistics.getInServiceXceiverCount();
|
||||||
|
avgLoad = (double)xceivers/nodes;
|
||||||
|
}
|
||||||
|
return avgLoad;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SnapshotManager getSnapshotManager() {
|
public SnapshotManager getSnapshotManager() {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
|
@ -101,6 +102,7 @@ public class TestReplicationPolicyConsiderLoad {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final double EPSILON = 0.0001;
|
||||||
/**
|
/**
|
||||||
* Tests that chooseTarget with considerLoad set to true correctly calculates
|
* Tests that chooseTarget with considerLoad set to true correctly calculates
|
||||||
* load with decommissioned nodes.
|
* load with decommissioned nodes.
|
||||||
|
@ -109,14 +111,6 @@ public class TestReplicationPolicyConsiderLoad {
|
||||||
public void testChooseTargetWithDecomNodes() throws IOException {
|
public void testChooseTargetWithDecomNodes() throws IOException {
|
||||||
namenode.getNamesystem().writeLock();
|
namenode.getNamesystem().writeLock();
|
||||||
try {
|
try {
|
||||||
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
|
|
||||||
// returns false
|
|
||||||
for (int i = 0; i < 3; i++) {
|
|
||||||
DatanodeInfo d = dnManager.getDatanodeByXferAddr(
|
|
||||||
dnrList.get(i).getIpAddr(),
|
|
||||||
dnrList.get(i).getXferPort());
|
|
||||||
d.setDecommissioned();
|
|
||||||
}
|
|
||||||
String blockPoolId = namenode.getNamesystem().getBlockPoolId();
|
String blockPoolId = namenode.getNamesystem().getBlockPoolId();
|
||||||
dnManager.handleHeartbeat(dnrList.get(3),
|
dnManager.handleHeartbeat(dnrList.get(3),
|
||||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
|
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
|
||||||
|
@ -133,6 +127,20 @@ public class TestReplicationPolicyConsiderLoad {
|
||||||
blockPoolId, dataNodes[5].getCacheCapacity(),
|
blockPoolId, dataNodes[5].getCacheCapacity(),
|
||||||
dataNodes[5].getCacheRemaining(),
|
dataNodes[5].getCacheRemaining(),
|
||||||
4, 0, 0);
|
4, 0, 0);
|
||||||
|
// value in the above heartbeats
|
||||||
|
final int load = 2 + 4 + 4;
|
||||||
|
|
||||||
|
FSNamesystem fsn = namenode.getNamesystem();
|
||||||
|
assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
|
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
|
||||||
|
// returns false
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
|
||||||
|
dnManager.startDecommission(d);
|
||||||
|
d.setDecommissioned();
|
||||||
|
}
|
||||||
|
assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
// Call chooseTarget()
|
// Call chooseTarget()
|
||||||
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -28,12 +30,21 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.DF;
|
import org.apache.hadoop.fs.DF;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport {
|
||||||
if (cluster != null) {cluster.shutdown();}
|
if (cluster != null) {cluster.shutdown();}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final float EPSILON = 0.0001f;
|
||||||
|
@Test
|
||||||
|
public void testXceiverCount() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
// don't waste time retrying if close fails
|
||||||
|
conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
final int nodes = 8;
|
||||||
|
final int fileCount = 5;
|
||||||
|
final short fileRepl = 3;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
|
||||||
|
List<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
// trigger heartbeats in case not already sent
|
||||||
|
triggerHeartbeats(datanodes);
|
||||||
|
|
||||||
|
// check that all nodes are live and in service
|
||||||
|
int expectedTotalLoad = nodes; // xceiver server adds 1 to load
|
||||||
|
int expectedInServiceNodes = nodes;
|
||||||
|
int expectedInServiceLoad = nodes;
|
||||||
|
assertEquals(nodes, namesystem.getNumLiveDataNodes());
|
||||||
|
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
|
||||||
|
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
|
||||||
|
assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
|
||||||
|
namesystem.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
|
// shutdown half the nodes and force a heartbeat check to ensure
|
||||||
|
// counts are accurate
|
||||||
|
for (int i=0; i < nodes/2; i++) {
|
||||||
|
DataNode dn = datanodes.get(i);
|
||||||
|
DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
|
||||||
|
dn.shutdown();
|
||||||
|
dnd.setLastUpdate(0L);
|
||||||
|
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
|
||||||
|
expectedInServiceNodes--;
|
||||||
|
assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
|
||||||
|
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
|
||||||
|
}
|
||||||
|
|
||||||
|
// restart the nodes to verify that counts are correct after
|
||||||
|
// node re-registration
|
||||||
|
cluster.restartDataNodes();
|
||||||
|
cluster.waitActive();
|
||||||
|
datanodes = cluster.getDataNodes();
|
||||||
|
expectedInServiceNodes = nodes;
|
||||||
|
assertEquals(nodes, datanodes.size());
|
||||||
|
assertEquals(nodes, namesystem.getNumLiveDataNodes());
|
||||||
|
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
|
||||||
|
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
|
||||||
|
assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
|
||||||
|
namesystem.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
|
// create streams and hsync to force datastreamers to start
|
||||||
|
DFSOutputStream[] streams = new DFSOutputStream[fileCount];
|
||||||
|
for (int i=0; i < fileCount; i++) {
|
||||||
|
streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl)
|
||||||
|
.getWrappedStream();
|
||||||
|
streams[i].write("1".getBytes());
|
||||||
|
streams[i].hsync();
|
||||||
|
// the load for writers is 2 because both the write xceiver & packet
|
||||||
|
// responder threads are counted in the load
|
||||||
|
expectedTotalLoad += 2*fileRepl;
|
||||||
|
expectedInServiceLoad += 2*fileRepl;
|
||||||
|
}
|
||||||
|
// force nodes to send load update
|
||||||
|
triggerHeartbeats(datanodes);
|
||||||
|
assertEquals(nodes, namesystem.getNumLiveDataNodes());
|
||||||
|
assertEquals(expectedInServiceNodes,
|
||||||
|
namesystem.getNumDatanodesInService());
|
||||||
|
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
|
||||||
|
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
|
||||||
|
namesystem.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
|
// decomm a few nodes, substract their load from the expected load,
|
||||||
|
// trigger heartbeat to force load update
|
||||||
|
for (int i=0; i < fileRepl; i++) {
|
||||||
|
expectedInServiceNodes--;
|
||||||
|
DatanodeDescriptor dnd =
|
||||||
|
dnm.getDatanode(datanodes.get(i).getDatanodeId());
|
||||||
|
expectedInServiceLoad -= dnd.getXceiverCount();
|
||||||
|
dnm.startDecommission(dnd);
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
|
||||||
|
Thread.sleep(100);
|
||||||
|
assertEquals(nodes, namesystem.getNumLiveDataNodes());
|
||||||
|
assertEquals(expectedInServiceNodes,
|
||||||
|
namesystem.getNumDatanodesInService());
|
||||||
|
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
|
||||||
|
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
|
||||||
|
namesystem.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check expected load while closing each stream. recalc expected
|
||||||
|
// load based on whether the nodes in the pipeline are decomm
|
||||||
|
for (int i=0; i < fileCount; i++) {
|
||||||
|
int decomm = 0;
|
||||||
|
for (DatanodeInfo dni : streams[i].getPipeline()) {
|
||||||
|
DatanodeDescriptor dnd = dnm.getDatanode(dni);
|
||||||
|
expectedTotalLoad -= 2;
|
||||||
|
if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
|
||||||
|
decomm++;
|
||||||
|
} else {
|
||||||
|
expectedInServiceLoad -= 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
streams[i].close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// nodes will go decommissioned even if there's a UC block whose
|
||||||
|
// other locations are decommissioned too. we'll ignore that
|
||||||
|
// bug for now
|
||||||
|
if (decomm < fileRepl) {
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
triggerHeartbeats(datanodes);
|
||||||
|
// verify node count and loads
|
||||||
|
assertEquals(nodes, namesystem.getNumLiveDataNodes());
|
||||||
|
assertEquals(expectedInServiceNodes,
|
||||||
|
namesystem.getNumDatanodesInService());
|
||||||
|
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
|
||||||
|
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
|
||||||
|
namesystem.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
}
|
||||||
|
|
||||||
|
// shutdown each node, verify node counts based on decomm state
|
||||||
|
for (int i=0; i < nodes; i++) {
|
||||||
|
DataNode dn = datanodes.get(i);
|
||||||
|
dn.shutdown();
|
||||||
|
// force it to appear dead so live count decreases
|
||||||
|
DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
|
||||||
|
dnDesc.setLastUpdate(0L);
|
||||||
|
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
|
||||||
|
assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
|
||||||
|
// first few nodes are already out of service
|
||||||
|
if (i >= fileRepl) {
|
||||||
|
expectedInServiceNodes--;
|
||||||
|
}
|
||||||
|
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
|
||||||
|
|
||||||
|
// live nodes always report load of 1. no nodes is load 0
|
||||||
|
double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
|
||||||
|
assertEquals((double)expectedXceiverAvg,
|
||||||
|
namesystem.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
}
|
||||||
|
|
||||||
|
// final sanity check
|
||||||
|
assertEquals(0, namesystem.getNumLiveDataNodes());
|
||||||
|
assertEquals(0, namesystem.getNumDatanodesInService());
|
||||||
|
assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
|
||||||
|
assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void triggerHeartbeats(List<DataNode> datanodes)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
for (DataNode dn : datanodes) {
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue