HDFS-12288. Fix DataNode's xceiver count calculation. Contributed by Lisheng Sun.

(cherry picked from commit 6e04b00df1)
This commit is contained in:
Inigo Goiri 2020-05-23 09:58:19 -07:00 committed by Wei-Chiu Chuang
parent e4217be749
commit 813aafcee2
9 changed files with 141 additions and 23 deletions

View File

@ -544,7 +544,7 @@ class BPServiceActor implements Runnable {
dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(), dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(), dn.getXmitsInProgress(),
dn.getXceiverCount(), dn.getActiveTransferThreadCount(),
numFailedVolumes, numFailedVolumes,
volumeFailureSummary, volumeFailureSummary,
requestBlockReportLease, requestBlockReportLease,

View File

@ -1368,6 +1368,7 @@ class BlockReceiver implements Closeable {
*/ */
@Override @Override
public void run() { public void run() {
datanode.metrics.incrDataNodePacketResponderCount();
boolean lastPacketInBlock = false; boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) { while (isRunning() && !lastPacketInBlock) {
@ -1505,6 +1506,9 @@ class BlockReceiver implements Closeable {
} }
} }
} }
// Any exception will be caught and processed in the previous loop, so we
// will always arrive here when the thread exiting
datanode.metrics.decrDataNodePacketResponderCount();
LOG.info(myString + " terminating"); LOG.info(myString + " terminating");
} }

View File

@ -601,17 +601,22 @@ public class BlockRecoveryWorker {
Daemon d = new Daemon(datanode.threadGroup, new Runnable() { Daemon d = new Daemon(datanode.threadGroup, new Runnable() {
@Override @Override
public void run() { public void run() {
for(RecoveringBlock b : blocks) { datanode.metrics.incrDataNodeBlockRecoveryWorkerCount();
try { try {
logRecoverBlock(who, b); for (RecoveringBlock b : blocks) {
if (b.isStriped()) { try {
new RecoveryTaskStriped((RecoveringStripedBlock) b).recover(); logRecoverBlock(who, b);
} else { if (b.isStriped()) {
new RecoveryTaskContiguous(b).recover(); new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
} else {
new RecoveryTaskContiguous(b).recover();
}
} catch (IOException e) {
LOG.warn("recover Block: {} FAILED: {}", b, e);
} }
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
} }
} finally {
datanode.metrics.decrDataNodeBlockRecoveryWorkerCount();
} }
} }
}); });

View File

@ -2148,6 +2148,8 @@ public class DataNode extends ReconfigurableBase
} }
if (metrics != null) { if (metrics != null) {
metrics.setDataNodeActiveXceiversCount(0); metrics.setDataNodeActiveXceiversCount(0);
metrics.setDataNodePacketResponderCount(0);
metrics.setDataNodeBlockRecoveryWorkerCount(0);
} }
// IPC server needs to be shutdown late in the process, otherwise // IPC server needs to be shutdown late in the process, otherwise
@ -2246,7 +2248,20 @@ public class DataNode extends ReconfigurableBase
/** Number of concurrent xceivers per node. */ /** Number of concurrent xceivers per node. */
@Override // DataNodeMXBean @Override // DataNodeMXBean
public int getXceiverCount() { public int getXceiverCount() {
return threadGroup == null ? 0 : threadGroup.activeCount(); if (metrics == null) {
return 0;
}
return metrics.getDataNodeActiveXceiverCount();
}
@Override // DataNodeMXBean
public int getActiveTransferThreadCount() {
if (metrics == null) {
return 0;
}
return metrics.getDataNodeActiveXceiverCount()
+ metrics.getDataNodePacketResponderCount()
+ metrics.getDataNodeBlockRecoveryWorkerCount();
} }
@Override // DataNodeMXBean @Override // DataNodeMXBean

View File

@ -104,11 +104,15 @@ public interface DataNodeMXBean {
public String getClusterId(); public String getClusterId();
/** /**
* Returns an estimate of the number of Datanode threads * Returns the number of active xceivers.
* actively transferring blocks.
*/ */
public int getXceiverCount(); public int getXceiverCount();
/**
* Returns the number of Datanode threads actively transferring blocks.
*/
int getActiveTransferThreadCount();
/** /**
* Returns an estimate of the number of data replication/reconstruction tasks * Returns an estimate of the number of data replication/reconstruction tasks
* running currently. * running currently.

View File

@ -109,6 +109,12 @@ public class DataNodeMetrics {
@Metric("Count of active dataNode xceivers") @Metric("Count of active dataNode xceivers")
private MutableGaugeInt dataNodeActiveXceiversCount; private MutableGaugeInt dataNodeActiveXceiversCount;
@Metric("Count of active DataNode packetResponder")
private MutableGaugeInt dataNodePacketResponderCount;
@Metric("Count of active DataNode block recovery worker")
private MutableGaugeInt dataNodeBlockRecoveryWorkerCount;
@Metric MutableRate readBlockOp; @Metric MutableRate readBlockOp;
@Metric MutableRate writeBlockOp; @Metric MutableRate writeBlockOp;
@Metric MutableRate blockChecksumOp; @Metric MutableRate blockChecksumOp;
@ -525,6 +531,42 @@ public class DataNodeMetrics {
dataNodeActiveXceiversCount.set(value); dataNodeActiveXceiversCount.set(value);
} }
public int getDataNodeActiveXceiverCount() {
return dataNodeActiveXceiversCount.value();
}
public void incrDataNodePacketResponderCount() {
dataNodePacketResponderCount.incr();
}
public void decrDataNodePacketResponderCount() {
dataNodePacketResponderCount.decr();
}
public void setDataNodePacketResponderCount(int value) {
dataNodePacketResponderCount.set(value);
}
public int getDataNodePacketResponderCount() {
return dataNodePacketResponderCount.value();
}
public void incrDataNodeBlockRecoveryWorkerCount() {
dataNodeBlockRecoveryWorkerCount.incr();
}
public void decrDataNodeBlockRecoveryWorkerCount() {
dataNodeBlockRecoveryWorkerCount.decr();
}
public void setDataNodeBlockRecoveryWorkerCount(int value) {
dataNodeBlockRecoveryWorkerCount.set(value);
}
public int getDataNodeBlockRecoveryWorkerCount() {
return dataNodeBlockRecoveryWorkerCount.value();
}
public void incrECDecodingTime(long decodingTimeNanos) { public void incrECDecodingTime(long decodingTimeNanos) {
ecDecodingTimeNanos.incr(decodingTimeNanos); ecDecodingTimeNanos.incr(decodingTimeNanos);
} }

View File

@ -253,12 +253,11 @@ public class TestDataTransferKeepalive {
} }
/** /**
* Returns the datanode's xceiver count, but subtracts 1, since the * Returns the datanode's active xceiver count.
* DataXceiverServer counts as one.
* *
* @return int xceiver count, not including DataXceiverServer * @return the datanode's active xceivers count.
*/ */
private int getXceiverCountWithoutServer() { private int getXceiverCountWithoutServer() {
return dn.getXceiverCount() - 1; return dn.getXceiverCount();
} }
} }

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.*;
import java.io.Closeable; import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -374,6 +375,57 @@ public class TestDataNodeMetrics {
} }
} }
@Test
public void testDataNodeMXBeanActiveThreadCount() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
Path p = new Path("/testfile");
try {
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(1, datanodes.size());
DataNode datanode = datanodes.get(0);
// create a xceiver thread for write
FSDataOutputStream os = fs.create(p);
for (int i = 0; i < 1024; i++) {
os.write("testdatastr".getBytes());
}
os.hsync();
// create a xceiver thread for read
InputStream is = fs.open(p);
is.read(new byte[16], 0, 4);
int threadCount = datanode.threadGroup.activeCount();
assertTrue(threadCount > 0);
Thread[] threads = new Thread[threadCount];
datanode.threadGroup.enumerate(threads);
int xceiverCount = 0;
int responderCount = 0;
int recoveryWorkerCount = 0;
for (Thread t : threads) {
if (t.getName().contains("DataXceiver for client")) {
xceiverCount++;
} else if (t.getName().contains("PacketResponder")) {
responderCount++;
}
}
assertEquals(2, xceiverCount);
assertEquals(1, responderCount);
assertEquals(0, recoveryWorkerCount); //not easy to produce
assertEquals(xceiverCount, datanode.getXceiverCount());
assertEquals(xceiverCount + responderCount + recoveryWorkerCount,
datanode.getActiveTransferThreadCount());
is.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test @Test
public void testDNShouldNotDeleteBlockONTooManyOpenFiles() public void testDNShouldNotDeleteBlockONTooManyOpenFiles()
throws Exception { throws Exception {

View File

@ -226,9 +226,9 @@ public class TestNamenodeCapacityReport {
triggerHeartbeats(datanodes); triggerHeartbeats(datanodes);
// check that all nodes are live and in service // check that all nodes are live and in service
int expectedTotalLoad = nodes; // xceiver server adds 1 to load int expectedTotalLoad = 0;
int expectedInServiceNodes = nodes; int expectedInServiceNodes = nodes;
int expectedInServiceLoad = nodes; int expectedInServiceLoad = 0;
checkClusterHealth(nodes, namesystem, expectedTotalLoad, checkClusterHealth(nodes, namesystem, expectedTotalLoad,
expectedInServiceNodes, expectedInServiceLoad); expectedInServiceNodes, expectedInServiceLoad);
@ -333,10 +333,7 @@ public class TestNamenodeCapacityReport {
expectedInServiceNodes--; expectedInServiceNodes--;
} }
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
// live nodes always report load of 1. no nodes is load 0 assertEquals(0, getInServiceXceiverAverage(namesystem), EPSILON);
double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
assertEquals((double)expectedXceiverAvg,
getInServiceXceiverAverage(namesystem), EPSILON);
} }
// final sanity check // final sanity check
checkClusterHealth(0, namesystem, 0.0, 0, 0.0); checkClusterHealth(0, namesystem, 0.0, 0, 0.0);