HDFS-15242. Add metrics for operations hold lock times of FsDatasetImpl. Contributed by Xiaoqiao He.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
Reviewed-by: Inigo Goiri <inigoiri@apache.org>
This commit is contained in:
He Xiaoqiao 2020-04-01 16:34:25 -07:00 committed by Wei-Chiu Chuang
parent c613296dc8
commit d3b5951572
4 changed files with 210 additions and 20 deletions

View File

@ -453,6 +453,22 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker | | `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker |
| `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker | | `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker |
| `EcReconstructionRemoteBytesRead` | Total number of bytes remote read by erasure coding worker | | `EcReconstructionRemoteBytesRead` | Total number of bytes remote read by erasure coding worker |
| `CreateRbwOpNumOps` | Total number of create rbw operations |
| `CreateRbwOpAvgTime` | Average time of create rbw operations in milliseconds |
| `RecoverRbwOpNumOps` | Total number of recovery rbw operations |
| `RecoverRbwOpAvgTime` | Average time of recovery rbw operations in milliseconds |
| `ConvertTemporaryToRbwOpNumOps` | Total number of convert temporary to rbw operations |
| `ConvertTemporaryToRbwOpAvgTime` | Average time of convert temporary to rbw operations in milliseconds |
| `CreateTemporaryOpNumOps` | Total number of create temporary operations |
| `CreateTemporaryOpAvgTime` | Average time of create temporary operations in milliseconds |
| `FinalizeBlockOpNumOps` | Total number of finalize block operations |
| `FinalizeBlockOpAvgTime` | Average time of finalize block operations in milliseconds |
| `UnfinalizeBlockOpNumOps` | Total number of un-finalize block operations |
| `UnfinalizeBlockOpAvgTime` | Average time of un-finalize block operations in milliseconds |
| `CheckAndUpdateOpNumOps` | Total number of check and update operations |
| `CheckAndUpdateOpAvgTime` | Average time of check and update operations in milliseconds |
| `UpdateReplicaUnderRecoveryOpNumOps` | Total number of update replica under recovery operations |
| `UpdateReplicaUnderRecoveryOpAvgTime` | Average time of update replica under recovery operations in milliseconds |
FsVolume FsVolume
-------- --------

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -243,6 +244,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
final DataNode datanode; final DataNode datanode;
private final DataNodeMetrics dataNodeMetrics;
final DataStorage dataStorage; final DataStorage dataStorage;
private final FsVolumeList volumes; private final FsVolumeList volumes;
final Map<String, DatanodeStorage> storageMap; final Map<String, DatanodeStorage> storageMap;
@ -284,6 +286,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
) throws IOException { ) throws IOException {
this.fsRunning = true; this.fsRunning = true;
this.datanode = datanode; this.datanode = datanode;
this.dataNodeMetrics = datanode.getMetrics();
this.dataStorage = storage; this.dataStorage = storage;
this.conf = conf; this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
@ -1425,6 +1428,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaHandler createRbw( public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b, StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException { boolean allowLazyPersist) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId()); b.getBlockId());
@ -1485,6 +1489,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref); return new ReplicaHandler(newReplicaInfo, ref);
} finally {
if (dataNodeMetrics != null) {
long createRbwMs = Time.monotonicNow() - startTimeMs;
dataNodeMetrics.addCreateRbwOp(createRbwMs);
}
} }
} }
@ -1493,7 +1502,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException { throws IOException {
LOG.info("Recover RBW replica " + b); LOG.info("Recover RBW replica " + b);
long startTimeMs = Time.monotonicNow();
try {
while (true) { while (true) {
try { try {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
@ -1516,6 +1526,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
datanode.getDnConf().getXceiverStopTimeout()); datanode.getDnConf().getXceiverStopTimeout());
} }
} }
} finally {
if (dataNodeMetrics != null) {
long recoverRbwMs = Time.monotonicNow() - startTimeMs;
dataNodeMetrics.addRecoverRbwOp(recoverRbwMs);
}
}
} }
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
@ -1581,7 +1597,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaInPipeline convertTemporaryToRbw( public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException { final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final long blockId = b.getBlockId(); final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp(); final long expectedGs = b.getGenerationStamp();
@ -1637,6 +1653,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// overwrite the RBW in the volume map // overwrite the RBW in the volume map
volumeMap.add(b.getBlockPoolId(), rbw.getReplicaInfo()); volumeMap.add(b.getBlockPoolId(), rbw.getReplicaInfo());
return rbw; return rbw;
} finally {
if (dataNodeMetrics != null) {
long convertTemporaryToRbwMs = Time.monotonicNow() - startTimeMs;
dataNodeMetrics.addConvertTemporaryToRbwOp(convertTemporaryToRbwMs);
}
} }
} }
@ -1701,6 +1722,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Stop the previous writer // Stop the previous writer
((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs); ((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
} while (true); } while (true);
long holdLockTimeMs = Time.monotonicNow() - startTimeMs;
if (lastFoundReplicaInfo != null if (lastFoundReplicaInfo != null
&& !isReplicaProvided(lastFoundReplicaInfo)) { && !isReplicaProvided(lastFoundReplicaInfo)) {
// Old blockfile should be deleted synchronously as it might collide // Old blockfile should be deleted synchronously as it might collide
@ -1709,6 +1731,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }, invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
false); false);
} }
long startHoldLockTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes()); .getNumBytes());
@ -1723,6 +1746,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref); return new ReplicaHandler(newReplicaInfo, ref);
} finally {
if (dataNodeMetrics != null) {
// Create temporary operation hold write lock twice.
long createTemporaryOpMs = Time.monotonicNow() - startHoldLockTimeMs
+ holdLockTimeMs;
dataNodeMetrics.addCreateTemporaryOp(createTemporaryOpMs);
}
} }
} }
@ -1760,6 +1790,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException { throws IOException {
ReplicaInfo replicaInfo = null; ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null; ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads // Don't allow data modifications from interrupted threads
@ -1772,6 +1803,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return; return;
} }
finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo); finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
} finally {
if (dataNodeMetrics != null) {
long finalizeBlockMs = Time.monotonicNow() - startTimeMs;
dataNodeMetrics.addFinalizeBlockOp(finalizeBlockMs);
}
} }
/* /*
* Sync the directory after rename from tmp/rbw to Finalized if * Sync the directory after rename from tmp/rbw to Finalized if
@ -1836,6 +1872,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException { public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock()); b.getLocalBlock());
@ -1853,6 +1890,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
b.getBlockId(), true); b.getBlockId(), true);
} }
} }
} finally {
if (dataNodeMetrics != null) {
long unFinalizedBlockMs = Time.monotonicNow() - startTimeMs;
dataNodeMetrics.addUnfinalizeBlockOp(unFinalizedBlockMs);
}
} }
} }
@ -2406,6 +2448,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Block corruptBlock = null; Block corruptBlock = null;
ReplicaInfo memBlockInfo; ReplicaInfo memBlockInfo;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId); memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null && if (memBlockInfo != null &&
@ -2581,6 +2624,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ memBlockInfo.getBlockDataLength()); + memBlockInfo.getBlockDataLength());
memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength()); memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength());
} }
} finally {
if (dataNodeMetrics != null) {
long checkAndUpdateTimeMs = Time.monotonicNow() - startTimeMs;
dataNodeMetrics.addCheckAndUpdateOp(checkAndUpdateTimeMs);
}
} }
// Send corrupt block report outside the lock // Send corrupt block report outside the lock
@ -2714,6 +2762,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long recoveryId, final long recoveryId,
final long newBlockId, final long newBlockId,
final long newlength) throws IOException { final long newlength) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = datasetWriteLock.acquire()) { try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
//get replica //get replica
final String bpid = oldBlock.getBlockPoolId(); final String bpid = oldBlock.getBlockPoolId();
@ -2770,6 +2819,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
checkReplicaFiles(finalized); checkReplicaFiles(finalized);
return finalized; return finalized;
} finally {
if (dataNodeMetrics != null) {
long updateReplicaUnderRecoveryMs = Time.monotonicNow() - startTimeMs;
dataNodeMetrics.addUpdateReplicaUnderRecoveryOp(
updateReplicaUnderRecoveryMs);
}
} }
} }

View File

@ -167,6 +167,16 @@ public class DataNodeMetrics {
@Metric("Rate of processed commands of all BPServiceActors") @Metric("Rate of processed commands of all BPServiceActors")
private MutableRate processedCommandsOp; private MutableRate processedCommandsOp;
// FsDatasetImpl local file process metrics.
@Metric private MutableRate createRbwOp;
@Metric private MutableRate recoverRbwOp;
@Metric private MutableRate convertTemporaryToRbwOp;
@Metric private MutableRate createTemporaryOp;
@Metric private MutableRate finalizeBlockOp;
@Metric private MutableRate unfinalizeBlockOp;
@Metric private MutableRate checkAndUpdateOp;
@Metric private MutableRate updateReplicaUnderRecoveryOp;
final MetricsRegistry registry = new MetricsRegistry("datanode"); final MetricsRegistry registry = new MetricsRegistry("datanode");
@Metric("Milliseconds spent on calling NN rpc") @Metric("Milliseconds spent on calling NN rpc")
private MutableRatesWithAggregation private MutableRatesWithAggregation
@ -574,4 +584,68 @@ public class DataNodeMetrics {
public void addNumProcessedCommands(long latency) { public void addNumProcessedCommands(long latency) {
processedCommandsOp.add(latency); processedCommandsOp.add(latency);
} }
/**
* Add addCreateRbwOp metrics.
* @param latency milliseconds of create RBW file
*/
public void addCreateRbwOp(long latency) {
createRbwOp.add(latency);
}
/**
* Add addRecoverRbwOp metrics.
* @param latency milliseconds of recovery RBW file
*/
public void addRecoverRbwOp(long latency) {
recoverRbwOp.add(latency);
}
/**
* Add addConvertTemporaryToRbwOp metrics.
* @param latency milliseconds of convert temporary to RBW file
*/
public void addConvertTemporaryToRbwOp(long latency) {
convertTemporaryToRbwOp.add(latency);
}
/**
* Add addCreateTemporaryOp metrics.
* @param latency milliseconds of create temporary block file
*/
public void addCreateTemporaryOp(long latency) {
createTemporaryOp.add(latency);
}
/**
* Add addFinalizeBlockOp metrics.
* @param latency milliseconds of finalize block
*/
public void addFinalizeBlockOp(long latency) {
finalizeBlockOp.add(latency);
}
/**
* Add addUnfinalizeBlockOp metrics.
* @param latency milliseconds of un-finalize block file
*/
public void addUnfinalizeBlockOp(long latency) {
unfinalizeBlockOp.add(latency);
}
/**
* Add addCheckAndUpdateOp metrics.
* @param latency milliseconds of check and update block file
*/
public void addCheckAndUpdateOp(long latency) {
checkAndUpdateOp.add(latency);
}
/**
* Add addUpdateReplicaUnderRecoveryOp metrics.
* @param latency milliseconds of update and replica under recovery block file
*/
public void addUpdateReplicaUnderRecoveryOp(long latency) {
updateReplicaUnderRecoveryOp.add(latency);
}
} }

View File

@ -37,6 +37,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import net.jcip.annotations.NotThreadSafe; import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -159,6 +160,50 @@ public class TestDataNodeMetrics {
} }
} }
/**
* HDFS-15242: This function ensures that writing causes some metrics
* of FSDatasetImpl to increment.
*/
@Test
public void testFsDatasetMetrics() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId();
List<DataNode> datanodes = cluster.getDataNodes();
DataNode datanode = datanodes.get(0);
// Verify both of metrics set to 0 when initialize.
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
assertCounter("CreateRbwOpNumOps", 0L, rb);
assertCounter("CreateTemporaryOpNumOps", 0L, rb);
assertCounter("FinalizeBlockOpNumOps", 0L, rb);
// Write into a file to trigger DN metrics.
DistributedFileSystem fs = cluster.getFileSystem();
Path testFile = new Path("/testBlockMetrics.txt");
FSDataOutputStream fout = fs.create(testFile);
fout.write(new byte[1]);
fout.hsync();
fout.close();
// Create temporary block file to trigger DN metrics.
final ExtendedBlock block = new ExtendedBlock(bpid, 1, 1, 2001);
datanode.data.createTemporary(StorageType.DEFAULT, null, block, false);
// Verify both of metrics value has updated after do some operations.
rb = getMetrics(datanode.getMetrics().name());
assertCounter("CreateRbwOpNumOps", 1L, rb);
assertCounter("CreateTemporaryOpNumOps", 1L, rb);
assertCounter("FinalizeBlockOpNumOps", 1L, rb);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/** /**
* Tests that round-trip acks in a datanode write pipeline are correctly * Tests that round-trip acks in a datanode write pipeline are correctly
* measured. * measured.