From 56a13a6a59cb128cf6fdac78a074faf7e5603967 Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Thu, 22 Dec 2016 14:18:54 +0800 Subject: [PATCH] HDFS-11216. Add remoteBytesRead counter metrics for erasure coding reconstruction task. Contributed by Sammi Chen --- .../apache/hadoop/test/MetricsAsserts.java | 7 +++++++ .../erasurecode/StripedBlockReader.java | 11 +++++++--- .../StripedBlockReconstructor.java | 1 + .../erasurecode/StripedReconstructor.java | 14 +++++++++++-- .../datanode/metrics/DataNodeMetrics.java | 6 ++++++ .../TestDataNodeErasureCodingMetrics.java | 21 ++++++++++++++++++- 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index 5d87b0732f9..a7bbe841366 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -236,6 +236,13 @@ public class MetricsAsserts { return captor.getValue(); } + public static long getLongCounterWithoutCheck(String name, + MetricsRecordBuilder rb) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); + verify(rb, atLeast(0)).addCounter(eqName(info(name, "")), captor.capture()); + return captor.getValue(); + } + public static String getStringMetric(String name, MetricsRecordBuilder rb) { ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index 0f7c5c77e19..556158c6609 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -65,6 +65,7 @@ class StripedBlockReader { private final DatanodeInfo source; private BlockReader blockReader; private ByteBuffer buffer; + private boolean isLocal; StripedBlockReader(StripedReader stripedReader, DataNode datanode, Configuration conf, short index, ExtendedBlock block, @@ -76,6 +77,7 @@ class StripedBlockReader { this.index = index; this.source = source; this.block = block; + this.isLocal = false; BlockReader tmpBlockReader = createBlockReader(offsetInBlock); if (tmpBlockReader != null) { @@ -116,10 +118,13 @@ class StripedBlockReader { * * TODO: add proper tracer */ + Peer peer = newConnectedPeer(block, dnAddr, blockToken, source); + if (peer.isLocal()) { + this.isLocal = true; + } return BlockReaderRemote.newBlockReader( "dummy", block, blockToken, offsetInBlock, - block.getNumBytes() - offsetInBlock, true, - "", newConnectedPeer(block, dnAddr, blockToken, source), source, + block.getNumBytes() - offsetInBlock, true, "", peer, source, null, stripedReader.getCachingStrategy(), datanode.getTracer(), -1); } catch (IOException e) { LOG.info("Exception while creating remote block reader, datanode {}", @@ -187,7 +192,7 @@ class StripedBlockReader { break; } n += nread; - stripedReader.getReconstructor().incrBytesRead(nread); + stripedReader.getReconstructor().incrBytesRead(isLocal, nread); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index 5554d6863e1..a1da536ba58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -70,6 +70,7 @@ class StripedBlockReconstructor extends StripedReconstructor final DataNodeMetrics metrics = getDatanode().getMetrics(); metrics.incrECReconstructionTasks(); metrics.incrECReconstructionBytesRead(getBytesRead()); + metrics.incrECReconstructionRemoteBytesRead(getRemoteBytesRead()); metrics.incrECReconstructionBytesWritten(getBytesWritten()); getStripedReader().close(); stripedWriter.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 68769f786cd..cd17864c943 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -118,6 +118,7 @@ abstract class StripedReconstructor { // metrics private AtomicLong bytesRead = new AtomicLong(0); private AtomicLong bytesWritten = new AtomicLong(0); + private AtomicLong remoteBytesRead = new AtomicLong(0); StripedReconstructor(ErasureCodingWorker worker, StripedReconstructionInfo stripedReconInfo) { @@ -138,8 +139,13 @@ abstract class StripedReconstructor { positionInBlock = 0L; } - public void incrBytesRead(long delta) { - bytesRead.addAndGet(delta); + public void incrBytesRead(boolean local, long delta) { + if (local) { + bytesRead.addAndGet(delta); + } else { + bytesRead.addAndGet(delta); + remoteBytesRead.addAndGet(delta); + } } public void incrBytesWritten(long delta) { @@ -150,6 +156,10 @@ abstract class StripedReconstructor { return bytesRead.get(); } + public long getRemoteBytesRead() { + return remoteBytesRead.get(); + } + public long getBytesWritten() { return bytesWritten.get(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index e09a85f8ebd..0d82fed770d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -140,6 +140,8 @@ public class DataNodeMetrics { MutableCounterLong ecReconstructionBytesRead; @Metric("Bytes written by erasure coding worker") MutableCounterLong ecReconstructionBytesWritten; + @Metric("Bytes remote read by erasure coding worker") + MutableCounterLong ecReconstructionRemoteBytesRead; final MetricsRegistry registry = new MetricsRegistry("datanode"); final String name; @@ -459,6 +461,10 @@ public class DataNodeMetrics { ecReconstructionBytesRead.incr(bytes); } + public void incrECReconstructionRemoteBytesRead(long bytes) { + ecReconstructionRemoteBytesRead.incr(bytes); + } + public void incrECReconstructionBytesWritten(long bytes) { ecReconstructionBytesWritten.incr(bytes); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java index 64ddbd7a2f8..7e64214dd9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounterWithoutCheck; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -98,6 +99,8 @@ public class TestDataNodeErasureCodingMetrics { blockGroupSize, getLongMetric("EcReconstructionBytesRead")); Assert.assertEquals("EcReconstructionBytesWritten should be ", blockSize, getLongMetric("EcReconstructionBytesWritten")); + Assert.assertEquals("EcReconstructionRemoteBytesRead should be ", + 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead")); } // A partial block, reconstruct the partial block @@ -110,6 +113,8 @@ public class TestDataNodeErasureCodingMetrics { fileLen, getLongMetric("EcReconstructionBytesRead")); Assert.assertEquals("EcReconstructionBytesWritten should be ", fileLen, getLongMetric("EcReconstructionBytesWritten")); + Assert.assertEquals("EcReconstructionRemoteBytesRead should be ", + 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead")); } // 1 full block + 5 partial block, reconstruct the full block @@ -121,8 +126,10 @@ public class TestDataNodeErasureCodingMetrics { Assert.assertEquals("ecReconstructionBytesRead should be ", cellSize * dataBlocks + cellSize + cellSize / 10, getLongMetric("EcReconstructionBytesRead")); - Assert.assertEquals("ecReconstructionBytesWritten should be ", + Assert.assertEquals("EcReconstructionBytesWritten should be ", blockSize, getLongMetric("EcReconstructionBytesWritten")); + Assert.assertEquals("EcReconstructionRemoteBytesRead should be ", + 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead")); } // 1 full block + 5 partial block, reconstruct the partial block @@ -137,6 +144,8 @@ public class TestDataNodeErasureCodingMetrics { Assert.assertEquals("ecReconstructionBytesWritten should be ", cellSize + cellSize / 10, getLongMetric("EcReconstructionBytesWritten")); + Assert.assertEquals("EcReconstructionRemoteBytesRead should be ", + 0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead")); } private long getLongMetric(String metricName) { @@ -149,6 +158,16 @@ public class TestDataNodeErasureCodingMetrics { return metricValue; } + private long getLongMetricWithoutCheck(String metricName) { + long metricValue = 0; + // Add all reconstruction metric value from all data nodes + for (DataNode dn : cluster.getDataNodes()) { + MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); + metricValue += getLongCounterWithoutCheck(metricName, rb); + } + return metricValue; + } + private void doTest(String fileName, int fileLen, int deadNodeIndex) throws Exception { assertTrue(fileLen > 0);