HDFS-16917 Add transfer rate quantile metrics for DataNode reads (#5397)

Co-authored-by: Ravindra Dingankar <rdingankar@linkedin.com>
This commit is contained in:
rdingankar 2023-02-27 10:26:32 -08:00 committed by GitHub
parent 61f369c43e
commit 0ca5686034
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 55 additions and 0 deletions

View File

@ -370,6 +370,9 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|:---- |:---- | |:---- |:---- |
| `BytesWritten` | Total number of bytes written to DataNode | | `BytesWritten` | Total number of bytes written to DataNode |
| `BytesRead` | Total number of bytes read from DataNode | | `BytesRead` | Total number of bytes read from DataNode |
| `ReadTransferRateNumOps` | Total number of data read transfers |
| `ReadTransferRateAvgTime` | Average transfer rate of bytes read from DataNode, measured in bytes per second. |
| `ReadTransferRate`*num*`s(50/75/90/95/99)thPercentileRate` | The 50/75/90/95/99th percentile of the transfer rate of bytes read from DataNode, measured in bytes per second. |
| `BlocksWritten` | Total number of blocks written to DataNode | | `BlocksWritten` | Total number of blocks written to DataNode |
| `BlocksRead` | Total number of blocks read from DataNode | | `BlocksRead` | Total number of blocks read from DataNode |
| `BlocksReplicated` | Total number of blocks replicated | | `BlocksReplicated` | Total number of blocks replicated |

View File

@ -69,6 +69,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -1936,4 +1937,18 @@ public class DFSUtil {
return path.charAt(parent.length()) == Path.SEPARATOR_CHAR return path.charAt(parent.length()) == Path.SEPARATOR_CHAR
|| parent.equals(Path.SEPARATOR); || parent.equals(Path.SEPARATOR);
} }
/**
* Add transfer rate metrics for valid data read and duration values.
* @param metrics metrics for datanodes
* @param read bytes read
* @param duration read duration
*/
public static void addTransferRateMetric(final DataNodeMetrics metrics, final long read, final long duration) {
if (read >= 0 && duration > 0) {
metrics.addReadTransferRate(read * 1000 / duration);
} else {
LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
}
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.protobuf.ByteString; import org.apache.hadoop.thirdparty.protobuf.ByteString;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
@ -632,6 +633,7 @@ class DataXceiver extends Receiver implements Runnable {
datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead(); datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration); datanode.metrics.incrTotalReadTime(duration);
DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);
} catch ( SocketException ignored ) { } catch ( SocketException ignored ) {
LOG.trace("{}:Ignoring exception while serving {} to {}", LOG.trace("{}:Ignoring exception while serving {} to {}",
dnR, block, remoteAddress, ignored); dnR, block, remoteAddress, ignored);
@ -1122,6 +1124,7 @@ class DataXceiver extends Receiver implements Runnable {
datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead(); datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration); datanode.metrics.incrTotalReadTime(duration);
DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);
LOG.info("Copied {} to {}", block, peer.getRemoteAddressString()); LOG.info("Copied {} to {}", block, peer.getRemoteAddressString());
} catch (IOException ioe) { } catch (IOException ioe) {

View File

@ -61,6 +61,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong bytesRead; @Metric MutableCounterLong bytesRead;
@Metric("Milliseconds spent reading") @Metric("Milliseconds spent reading")
MutableCounterLong totalReadTime; MutableCounterLong totalReadTime;
@Metric private MutableRate readTransferRate;
final private MutableQuantiles[] readTransferRateQuantiles;
@Metric MutableCounterLong blocksWritten; @Metric MutableCounterLong blocksWritten;
@Metric MutableCounterLong blocksRead; @Metric MutableCounterLong blocksRead;
@Metric MutableCounterLong blocksReplicated; @Metric MutableCounterLong blocksReplicated;
@ -227,6 +229,7 @@ public class DataNodeMetrics {
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len]; sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len]; ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len]; ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
readTransferRateQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
int interval = intervals[i]; int interval = intervals[i];
@ -255,6 +258,10 @@ public class DataNodeMetrics {
"ramDiskBlocksLazyPersistWindows" + interval + "s", "ramDiskBlocksLazyPersistWindows" + interval + "s",
"Time between the RamDisk block write and disk persist in ms", "Time between the RamDisk block write and disk persist in ms",
"ops", "latency", interval); "ops", "latency", interval);
readTransferRateQuantiles[i] = registry.newQuantiles(
"readTransferRate" + interval + "s",
"Rate at which bytes are read from datanode calculated in bytes per second",
"ops", "rate", interval);
} }
} }
@ -316,6 +323,13 @@ public class DataNodeMetrics {
} }
} }
public void addReadTransferRate(long readTransferRate) {
this.readTransferRate.add(readTransferRate);
for (MutableQuantiles q : readTransferRateQuantiles) {
q.add(readTransferRate);
}
}
public void addCacheReport(long latency) { public void addCacheReport(long latency) {
cacheReports.add(latency); cacheReports.add(latency);
} }

View File

@ -45,6 +45,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
@ -1108,4 +1110,18 @@ public class TestDFSUtil {
LambdaTestUtils.intercept(IOException.class, expectedErrorMessage, LambdaTestUtils.intercept(IOException.class, expectedErrorMessage,
()->DFSUtil.getNNServiceRpcAddressesForCluster(conf)); ()->DFSUtil.getNNServiceRpcAddressesForCluster(conf));
} }
@Test
public void testAddTransferRateMetricForValidValues() {
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
DFSUtil.addTransferRateMetric(mockMetrics, 100, 10);
verify(mockMetrics).addReadTransferRate(10000);
}
@Test
public void testAddTransferRateMetricForInvalidValue() {
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
DFSUtil.addTransferRateMetric(mockMetrics, 100, 0);
verify(mockMetrics, times(0)).addReadTransferRate(anyLong());
}
} }

View File

@ -380,6 +380,7 @@ public class TestDataNodeMetrics {
@Test(timeout=120000) @Test(timeout=120000)
public void testDataNodeTimeSpend() throws Exception { public void testDataNodeTimeSpend() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + 60);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try { try {
final FileSystem fs = cluster.getFileSystem(); final FileSystem fs = cluster.getFileSystem();
@ -391,6 +392,7 @@ public class TestDataNodeMetrics {
final long startWriteValue = getLongCounter("TotalWriteTime", rb); final long startWriteValue = getLongCounter("TotalWriteTime", rb);
final long startReadValue = getLongCounter("TotalReadTime", rb); final long startReadValue = getLongCounter("TotalReadTime", rb);
assertCounter("ReadTransferRateNumOps", 0L, rb);
final AtomicInteger x = new AtomicInteger(0); final AtomicInteger x = new AtomicInteger(0);
// Lets Metric system update latest metrics // Lets Metric system update latest metrics
@ -410,6 +412,8 @@ public class TestDataNodeMetrics {
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name()); MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
final long endWriteValue = getLongCounter("TotalWriteTime", rbNew); final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
final long endReadValue = getLongCounter("TotalReadTime", rbNew); final long endReadValue = getLongCounter("TotalReadTime", rbNew);
assertCounter("ReadTransferRateNumOps", 1L, rbNew);
assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate");
return endWriteValue > startWriteValue return endWriteValue > startWriteValue
&& endReadValue > startReadValue; && endReadValue > startReadValue;
} }