diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index adbf025cd04..ec8c79be1ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -125,7 +125,9 @@ class BPServiceActor implements Runnable { this.initialRegistrationComplete = lifelineNnAddr != null ? new CountDownLatch(1) : null; this.dnConf = dn.getDnConf(); - this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval); + this.ibrManager = new IncrementalBlockReportManager( + dnConf.ibrInterval, + dn.getMetrics()); prevBlockReportId = ThreadLocalRandom.current().nextLong(); scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, @@ -349,7 +351,7 @@ class BPServiceActor implements Runnable { // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId(), dn.getMetrics()); + bpos.getBlockPoolId()); long brCreateStartTime = monotonicNow(); Map perVolumeBlockLists = @@ -672,7 +674,7 @@ class BPServiceActor implements Runnable { } if (ibrManager.sendImmediately() || sendHeartbeat) { ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId(), dn.getMetrics()); + bpos.getBlockPoolId()); } List cmds = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java index e95142db872..1779374f573 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java @@ -52,6 +52,11 @@ class IncrementalBlockReportManager { /** The blocks in this IBR. */ final Map blocks = Maps.newHashMap(); + private DataNodeMetrics dnMetrics; + PerStorageIBR(final DataNodeMetrics dnMetrics) { + this.dnMetrics = dnMetrics; + } + /** * Remove the given block from this IBR * @return true if the block was removed; otherwise, return false. @@ -76,6 +81,25 @@ class IncrementalBlockReportManager { /** Put the block to this IBR. */ void put(ReceivedDeletedBlockInfo rdbi) { blocks.put(rdbi.getBlock(), rdbi); + increaseBlocksCounter(rdbi); + } + + private void increaseBlocksCounter( + final ReceivedDeletedBlockInfo receivedDeletedBlockInfo) { + switch (receivedDeletedBlockInfo.getStatus()) { + case RECEIVING_BLOCK: + dnMetrics.incrBlocksReceivingInPendingIBR(); + break; + case RECEIVED_BLOCK: + dnMetrics.incrBlocksReceivedInPendingIBR(); + break; + case DELETED_BLOCK: + dnMetrics.incrBlocksDeletedInPendingIBR(); + break; + default: + break; + } + dnMetrics.incrBlocksInPendingIBR(); } /** @@ -114,10 +138,14 @@ class IncrementalBlockReportManager { /** The timestamp of the last IBR. */ private volatile long lastIBR; + private DataNodeMetrics dnMetrics; - IncrementalBlockReportManager(final long ibrInterval) { + IncrementalBlockReportManager( + final long ibrInterval, + final DataNodeMetrics dnMetrics) { this.ibrInterval = ibrInterval; this.lastIBR = monotonicNow() - ibrInterval; + this.dnMetrics = dnMetrics; } boolean sendImmediately() { @@ -147,6 +175,10 @@ class IncrementalBlockReportManager { reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi)); } } + + /* set blocks to zero */ + this.dnMetrics.resetBlocksInPendingIBR(); + readyToSend = false; return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]); } @@ -162,7 +194,7 @@ class IncrementalBlockReportManager { /** Send IBRs to namenode. */ void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration, - String bpid, DataNodeMetrics metrics) throws IOException { + String bpid) throws IOException { // Generate a list of the pending reports for each storage under the lock final StorageReceivedDeletedBlocks[] reports = generateIBRs(); if (reports.length == 0) { @@ -180,8 +212,9 @@ class IncrementalBlockReportManager { namenode.blockReceivedAndDeleted(registration, bpid, reports); success = true; } finally { - metrics.addIncrementalBlockReport(monotonicNow() - startTime); + if (success) { + dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime); lastIBR = startTime; } else { // If we didn't succeed in sending the report, put all of the @@ -199,7 +232,7 @@ class IncrementalBlockReportManager { // This is the first time we are adding incremental BR state for // this storage so create a new map. This is required once per // storage, per service actor. - perStorage = new PerStorageIBR(); + perStorage = new PerStorageIBR(dnMetrics); pendingIBRs.put(storage, perStorage); } return perStorage; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 085762bf466..eb5d7f0eb39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.source.JvmMetrics; import java.util.concurrent.ThreadLocalRandom; @@ -126,6 +127,15 @@ public class DataNodeMetrics { @Metric MutableRate sendDataPacketTransferNanos; final MutableQuantiles[] sendDataPacketTransferNanosQuantiles; + @Metric("Count of blocks in pending IBR") + private MutableGaugeLong blocksInPendingIBR; + @Metric("Count of blocks at receiving status in pending IBR") + private MutableGaugeLong blocksReceivingInPendingIBR; + @Metric("Count of blocks at received status in pending IBR") + private MutableGaugeLong blocksReceivedInPendingIBR; + @Metric("Count of blocks at deleted status in pending IBR") + private MutableGaugeLong blocksDeletedInPendingIBR; + final MetricsRegistry registry = new MetricsRegistry("datanode"); final String name; JvmMetrics jvmMetrics = null; @@ -415,4 +425,30 @@ public class DataNodeMetrics { q.add(latencyMs); } } + + /** + * Resets blocks in pending IBR to zero. + */ + public void resetBlocksInPendingIBR() { + blocksInPendingIBR.set(0); + blocksReceivingInPendingIBR.set(0); + blocksReceivedInPendingIBR.set(0); + blocksDeletedInPendingIBR.set(0); + } + + public void incrBlocksInPendingIBR() { + blocksInPendingIBR.incr(); + } + + public void incrBlocksReceivingInPendingIBR() { + blocksReceivingInPendingIBR.incr(); + } + + public void incrBlocksReceivedInPendingIBR() { + blocksReceivedInPendingIBR.incr(); + } + + public void incrBlocksDeletedInPendingIBR() { + blocksDeletedInPendingIBR.incr(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockCountersInPendingIBR.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockCountersInPendingIBR.java new file mode 100644 index 00000000000..43d39a6924c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockCountersInPendingIBR.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.timeout; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.MetricsAsserts; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test counters for number of blocks in pending IBR. + */ +public class TestBlockCountersInPendingIBR { + + @Test + public void testBlockCounters() throws Exception { + final Configuration conf = new HdfsConfiguration(); + + /* + * Set a really long value for dfs.blockreport.intervalMsec and + * dfs.heartbeat.interval, so that incremental block reports and heartbeats + * won't be sent during this test unless they're triggered manually. + */ + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L); + + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + final DatanodeProtocolClientSideTranslatorPB spy = + InternalDataNodeTestUtils.spyOnBposToNN( + cluster.getDataNodes().get(0), cluster.getNameNode()); + final DataNode datanode = cluster.getDataNodes().get(0); + + /* We should get 0 incremental block report. */ + Mockito.verify(spy, timeout(60000).times(0)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + /* + * Create fake blocks notification on the DataNode. This will be sent with + * the next incremental block report. + */ + final BPServiceActor actor = + datanode.getAllBpOs().get(0).getBPServiceActors().get(0); + final FsDatasetSpi dataset = datanode.getFSDataset(); + final DatanodeStorage storage; + try (FsDatasetSpi.FsVolumeReferences volumes = + dataset.getFsVolumeReferences()) { + storage = dataset.getStorage(volumes.get(0).getStorageID()); + } + + ReceivedDeletedBlockInfo rdbi = null; + /* block at status of RECEIVING_BLOCK */ + rdbi = new ReceivedDeletedBlockInfo( + new Block(5678, 512, 1000), BlockStatus.RECEIVING_BLOCK, null); + actor.getIbrManager().addRDBI(rdbi, storage); + + /* block at status of RECEIVED_BLOCK */ + rdbi = new ReceivedDeletedBlockInfo( + new Block(5679, 512, 1000), BlockStatus.RECEIVED_BLOCK, null); + actor.getIbrManager().addRDBI(rdbi, storage); + + /* block at status of DELETED_BLOCK */ + rdbi = new ReceivedDeletedBlockInfo( + new Block(5680, 512, 1000), BlockStatus.DELETED_BLOCK, null); + actor.getIbrManager().addRDBI(rdbi, storage); + + /* verify counters before sending IBR */ + verifyBlockCounters(datanode, 3, 1, 1, 1); + + /* Manually trigger a block report. */ + datanode.triggerBlockReport( + new BlockReportOptions.Factory(). + setIncremental(true). + build() + ); + + /* + * triggerBlockReport returns before the block report is actually sent. Wait + * for it to be sent here. + */ + Mockito.verify(spy, timeout(60000).times(1)). + blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + + /* verify counters after sending IBR */ + verifyBlockCounters(datanode, 0, 0, 0, 0); + + cluster.shutdown(); + } + + + private void verifyBlockCounters(final DataNode datanode, + final long blocksInPendingIBR, final long blocksReceivingInPendingIBR, + final long blocksReceivedInPendingIBR, + final long blocksDeletedInPendingIBR) { + + final MetricsRecordBuilder m = MetricsAsserts + .getMetrics(datanode.getMetrics().name()); + + MetricsAsserts.assertGauge("BlocksInPendingIBR", + blocksInPendingIBR, m); + MetricsAsserts.assertGauge("BlocksReceivingInPendingIBR", + blocksReceivingInPendingIBR, m); + MetricsAsserts.assertGauge("BlocksReceivedInPendingIBR", + blocksReceivedInPendingIBR, m); + MetricsAsserts.assertGauge("BlocksDeletedInPendingIBR", + blocksDeletedInPendingIBR, m); + } +}