svn merge -c 1611737 FIXES: HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.5@1611739 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2014-07-18 18:04:49 +00:00
parent 8ba7b7587d
commit 61235ba880
10 changed files with 259 additions and 22 deletions

View File

@ -254,6 +254,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9) HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
BUG FIXES BUG FIXES
HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration. HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.

View File

@ -336,7 +336,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n"); buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n"); buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n"); buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
buffer.append("Xceivers: "+getXceiverCount()+"\n");
buffer.append("Last contact: "+new Date(lastUpdate)+"\n"); buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
return buffer.toString(); return buffer.toString();
} }

View File

@ -636,15 +636,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// check the communication traffic of the target machine // check the communication traffic of the target machine
if (considerLoad) { if (considerLoad) {
double avgLoad = 0; final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
if (stats != null) { final int nodeLoad = node.getXceiverCount();
int size = stats.getNumDatanodesInService(); if (nodeLoad > maxLoad) {
if (size != 0) { logNodeIsNotChosen(storage,
avgLoad = (double)stats.getTotalLoad()/size; "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
}
}
if (node.getXceiverCount() > (2.0 * avgLoad)) {
logNodeIsNotChosen(storage, "the node is too busy ");
return false; return false;
} }
} }

View File

@ -819,7 +819,9 @@ public class DatanodeManager {
} }
/** Start decommissioning the specified datanode. */ /** Start decommissioning the specified datanode. */
private void startDecommission(DatanodeDescriptor node) { @InterfaceAudience.Private
@VisibleForTesting
public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Start Decommissioning " + node + " " + storage LOG.info("Start Decommissioning " + node + " " + storage

View File

@ -52,6 +52,12 @@ public interface DatanodeStatistics {
/** @return the xceiver count */ /** @return the xceiver count */
public int getXceiverCount(); public int getXceiverCount();
/** @return average xceiver count for non-decommission(ing|ed) nodes */
public int getInServiceXceiverCount();
/** @return number of non-decommission(ing|ed) nodes */
public int getNumDatanodesInService();
/** /**
* @return the total used space by data nodes for non-DFS purposes * @return the total used space by data nodes for non-DFS purposes
* such as storing temporary files on the local file system * such as storing temporary files on the local file system

View File

@ -150,6 +150,16 @@ class HeartbeatManager implements DatanodeStatistics {
return stats.xceiverCount; return stats.xceiverCount;
} }
@Override
public synchronized int getInServiceXceiverCount() {
return stats.nodesInServiceXceiverCount;
}
@Override
public synchronized int getNumDatanodesInService() {
return stats.nodesInService;
}
@Override @Override
public synchronized long getCacheCapacity() { public synchronized long getCacheCapacity() {
return stats.cacheCapacity; return stats.cacheCapacity;
@ -178,7 +188,7 @@ class HeartbeatManager implements DatanodeStatistics {
} }
synchronized void register(final DatanodeDescriptor d) { synchronized void register(final DatanodeDescriptor d) {
if (!datanodes.contains(d)) { if (!d.isAlive) {
addDatanode(d); addDatanode(d);
//update its timestamp //update its timestamp
@ -191,6 +201,8 @@ class HeartbeatManager implements DatanodeStatistics {
} }
synchronized void addDatanode(final DatanodeDescriptor d) { synchronized void addDatanode(final DatanodeDescriptor d) {
// update in-service node count
stats.add(d);
datanodes.add(d); datanodes.add(d);
d.isAlive = true; d.isAlive = true;
} }
@ -323,6 +335,9 @@ class HeartbeatManager implements DatanodeStatistics {
private long cacheCapacity = 0L; private long cacheCapacity = 0L;
private long cacheUsed = 0L; private long cacheUsed = 0L;
private int nodesInService = 0;
private int nodesInServiceXceiverCount = 0;
private int expiredHeartbeats = 0; private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) { private void add(final DatanodeDescriptor node) {
@ -330,6 +345,8 @@ class HeartbeatManager implements DatanodeStatistics {
blockPoolUsed += node.getBlockPoolUsed(); blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount(); xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService++;
nodesInServiceXceiverCount += node.getXceiverCount();
capacityTotal += node.getCapacity(); capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining(); capacityRemaining += node.getRemaining();
} else { } else {
@ -344,6 +361,8 @@ class HeartbeatManager implements DatanodeStatistics {
blockPoolUsed -= node.getBlockPoolUsed(); blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount(); xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
nodesInService--;
nodesInServiceXceiverCount -= node.getXceiverCount();
capacityTotal -= node.getCapacity(); capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining(); capacityRemaining -= node.getRemaining();
} else { } else {

View File

@ -48,6 +48,15 @@ public interface FSClusterStats {
* @return Number of datanodes that are both alive and not decommissioned. * @return Number of datanodes that are both alive and not decommissioned.
*/ */
public int getNumDatanodesInService(); public int getNumDatanodesInService();
/**
* an indication of the average load of non-decommission(ing|ed) nodes
* eligible for block placement
*
* @return average of the in service number of block transfers and block
* writes that are currently occurring on the cluster.
*/
public double getInServiceXceiverAverage();
} }

View File

@ -7319,7 +7319,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
@Override // FSClusterStats @Override // FSClusterStats
public int getNumDatanodesInService() { public int getNumDatanodesInService() {
return getNumLiveDataNodes() - getNumDecomLiveDataNodes(); return datanodeStatistics.getNumDatanodesInService();
}
@Override // for block placement strategy
public double getInServiceXceiverAverage() {
double avgLoad = 0;
final int nodes = getNumDatanodesInService();
if (nodes != 0) {
final int xceivers = datanodeStatistics.getInServiceXceiverCount();
avgLoad = (double)xceivers/nodes;
}
return avgLoad;
} }
public SnapshotManager getSnapshotManager() { public SnapshotManager getSnapshotManager() {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
@ -102,6 +103,7 @@ public class TestReplicationPolicyConsiderLoad {
} }
} }
private final double EPSILON = 0.0001;
/** /**
* Tests that chooseTarget with considerLoad set to true correctly calculates * Tests that chooseTarget with considerLoad set to true correctly calculates
* load with decommissioned nodes. * load with decommissioned nodes.
@ -110,14 +112,6 @@ public class TestReplicationPolicyConsiderLoad {
public void testChooseTargetWithDecomNodes() throws IOException { public void testChooseTargetWithDecomNodes() throws IOException {
namenode.getNamesystem().writeLock(); namenode.getNamesystem().writeLock();
try { try {
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
// returns false
for (int i = 0; i < 3; i++) {
DatanodeInfo d = dnManager.getDatanodeByXferAddr(
dnrList.get(i).getIpAddr(),
dnrList.get(i).getXferPort());
d.setDecommissioned();
}
String blockPoolId = namenode.getNamesystem().getBlockPoolId(); String blockPoolId = namenode.getNamesystem().getBlockPoolId();
dnManager.handleHeartbeat(dnrList.get(3), dnManager.handleHeartbeat(dnrList.get(3),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]), BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
@ -134,6 +128,20 @@ public class TestReplicationPolicyConsiderLoad {
blockPoolId, dataNodes[5].getCacheCapacity(), blockPoolId, dataNodes[5].getCacheCapacity(),
dataNodes[5].getCacheRemaining(), dataNodes[5].getCacheRemaining(),
4, 0, 0); 4, 0, 0);
// value in the above heartbeats
final int load = 2 + 4 + 4;
FSNamesystem fsn = namenode.getNamesystem();
assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
// returns false
for (int i = 0; i < 3; i++) {
DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
dnManager.startDecommission(d);
d.setDecommissioned();
}
assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
// Call chooseTarget() // Call chooseTarget()
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertTrue; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
import static org.junit.Assert.*;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -28,12 +30,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.junit.Test; import org.junit.Test;
@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport {
if (cluster != null) {cluster.shutdown();} if (cluster != null) {cluster.shutdown();}
} }
} }
private static final float EPSILON = 0.0001f;
@Test
public void testXceiverCount() throws Exception {
Configuration conf = new HdfsConfiguration();
// don't waste time retrying if close fails
conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
MiniDFSCluster cluster = null;
final int nodes = 8;
final int fileCount = 5;
final short fileRepl = 3;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build();
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
List<DataNode> datanodes = cluster.getDataNodes();
final DistributedFileSystem fs = cluster.getFileSystem();
// trigger heartbeats in case not already sent
triggerHeartbeats(datanodes);
// check that all nodes are live and in service
int expectedTotalLoad = nodes; // xceiver server adds 1 to load
int expectedInServiceNodes = nodes;
int expectedInServiceLoad = nodes;
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
namesystem.getInServiceXceiverAverage(), EPSILON);
// shutdown half the nodes and force a heartbeat check to ensure
// counts are accurate
for (int i=0; i < nodes/2; i++) {
DataNode dn = datanodes.get(i);
DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
dn.shutdown();
dnd.setLastUpdate(0L);
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
expectedInServiceNodes--;
assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
}
// restart the nodes to verify that counts are correct after
// node re-registration
cluster.restartDataNodes();
cluster.waitActive();
datanodes = cluster.getDataNodes();
expectedInServiceNodes = nodes;
assertEquals(nodes, datanodes.size());
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
namesystem.getInServiceXceiverAverage(), EPSILON);
// create streams and hsync to force datastreamers to start
DFSOutputStream[] streams = new DFSOutputStream[fileCount];
for (int i=0; i < fileCount; i++) {
streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl)
.getWrappedStream();
streams[i].write("1".getBytes());
streams[i].hsync();
// the load for writers is 2 because both the write xceiver & packet
// responder threads are counted in the load
expectedTotalLoad += 2*fileRepl;
expectedInServiceLoad += 2*fileRepl;
}
// force nodes to send load update
triggerHeartbeats(datanodes);
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes,
namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
namesystem.getInServiceXceiverAverage(), EPSILON);
// decomm a few nodes, substract their load from the expected load,
// trigger heartbeat to force load update
for (int i=0; i < fileRepl; i++) {
expectedInServiceNodes--;
DatanodeDescriptor dnd =
dnm.getDatanode(datanodes.get(i).getDatanodeId());
expectedInServiceLoad -= dnd.getXceiverCount();
dnm.startDecommission(dnd);
DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
Thread.sleep(100);
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes,
namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
namesystem.getInServiceXceiverAverage(), EPSILON);
}
// check expected load while closing each stream. recalc expected
// load based on whether the nodes in the pipeline are decomm
for (int i=0; i < fileCount; i++) {
int decomm = 0;
for (DatanodeInfo dni : streams[i].getPipeline()) {
DatanodeDescriptor dnd = dnm.getDatanode(dni);
expectedTotalLoad -= 2;
if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
decomm++;
} else {
expectedInServiceLoad -= 2;
}
}
try {
streams[i].close();
} catch (IOException ioe) {
// nodes will go decommissioned even if there's a UC block whose
// other locations are decommissioned too. we'll ignore that
// bug for now
if (decomm < fileRepl) {
throw ioe;
}
}
triggerHeartbeats(datanodes);
// verify node count and loads
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes,
namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
namesystem.getInServiceXceiverAverage(), EPSILON);
}
// shutdown each node, verify node counts based on decomm state
for (int i=0; i < nodes; i++) {
DataNode dn = datanodes.get(i);
dn.shutdown();
// force it to appear dead so live count decreases
DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
dnDesc.setLastUpdate(0L);
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
// first few nodes are already out of service
if (i >= fileRepl) {
expectedInServiceNodes--;
}
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
// live nodes always report load of 1. no nodes is load 0
double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
assertEquals((double)expectedXceiverAvg,
namesystem.getInServiceXceiverAverage(), EPSILON);
}
// final sanity check
assertEquals(0, namesystem.getNumLiveDataNodes());
assertEquals(0, namesystem.getNumDatanodesInService());
assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void triggerHeartbeats(List<DataNode> datanodes)
throws IOException, InterruptedException {
for (DataNode dn : datanodes) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
Thread.sleep(100);
}
} }