diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 5e2250915d7..de7ecba25a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Time.now; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -270,7 +271,8 @@ class BPServiceActor implements Runnable { private void reportReceivedDeletedBlocks() throws IOException { // Generate a list of the pending reports for each storage under the lock - Map blockArrays = Maps.newHashMap(); + ArrayList reports = + new ArrayList(pendingIncrementalBRperStorage.size()); synchronized (pendingIncrementalBRperStorage) { for (Map.Entry entry : pendingIncrementalBRperStorage.entrySet()) { @@ -283,33 +285,34 @@ class BPServiceActor implements Runnable { pendingReceivedRequests = (pendingReceivedRequests > rdbi.length ? (pendingReceivedRequests - rdbi.length) : 0); - blockArrays.put(storageUuid, rdbi); + reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi)); } } } - // Send incremental block reports to the Namenode outside the lock - for (Map.Entry entry : - blockArrays.entrySet()) { - final String storageUuid = entry.getKey(); - final ReceivedDeletedBlockInfo[] rdbi = entry.getValue(); + if (reports.size() == 0) { + // Nothing new to report. + return; + } - StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( - storageUuid, rdbi) }; - boolean success = false; - try { - bpNamenode.blockReceivedAndDeleted(bpRegistration, - bpos.getBlockPoolId(), report); - success = true; - } finally { - if (!success) { - synchronized (pendingIncrementalBRperStorage) { + // Send incremental block reports to the Namenode outside the lock + boolean success = false; + try { + bpNamenode.blockReceivedAndDeleted(bpRegistration, + bpos.getBlockPoolId(), + reports.toArray(new StorageReceivedDeletedBlocks[reports.size()])); + success = true; + } finally { + if (!success) { + synchronized (pendingIncrementalBRperStorage) { + for (StorageReceivedDeletedBlocks report : reports) { // If we didn't succeed in sending the report, put all of the // blocks back onto our queue, but only in the case where we // didn't put something newer in the meantime. PerStoragePendingIncrementalBR perStorageMap = - pendingIncrementalBRperStorage.get(storageUuid); - pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi); + pendingIncrementalBRperStorage.get(report.getStorageID()); + pendingReceivedRequests += + perStorageMap.putMissingBlockInfos(report.getBlocks()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index c73cd8eead0..ab64f365728 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -983,6 +983,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { verifyRequest(nodeReg); + metrics.incrBlockReceivedAndDeletedOps(); if(blockStateChangeLog.isDebugEnabled()) { blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " +"from "+nodeReg+" "+receivedAndDeletedBlocks.length diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index af30713e3fa..61fcc13dcec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -71,6 +71,8 @@ public class NameNodeMetrics { MutableCounterLong listSnapshottableDirOps; @Metric("Number of snapshotDiffReport operations") MutableCounterLong snapshotDiffReportOps; + @Metric("Number of blockReceivedAndDeleted calls") + MutableCounterLong blockReceivedAndDeletedOps; @Metric("Journal transactions") MutableRate transactions; @Metric("Journal syncs") MutableRate syncs; @@ -210,6 +212,10 @@ public class NameNodeMetrics { snapshotDiffReportOps.incr(); } + public void incrBlockReceivedAndDeletedOps() { + blockReceivedAndDeletedOps.incr(); + } + public void addTransaction(long latency) { transactions.add(latency); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 6a0510c2ce3..46c09384cad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2144,17 +2144,14 @@ public class MiniDFSCluster { } /** - * Get a storage directory for a datanode. There are two storage directories - * per datanode: + * Get a storage directory for a datanode. *
    *
  1. /data/data<2*dnIndex + 1>
  2. *
  3. /data/data<2*dnIndex + 2>
  4. *
* * @param dnIndex datanode index (starts from 0) - * @param dirIndex directory index (0 or 1). Index 0 provides access to the - * first storage directory. Index 1 provides access to the second - * storage directory. + * @param dirIndex directory index. * @return Storage directory */ public static File getStorageDir(int dnIndex, int dirIndex) { @@ -2165,7 +2162,7 @@ public class MiniDFSCluster { * Calculate the DN instance-specific path for appending to the base dir * to determine the location of the storage of a DN instance in the mini cluster * @param dnIndex datanode index - * @param dirIndex directory index (0 or 1). + * @param dirIndex directory index. * @return */ private static String getStorageDirPath(int dnIndex, int dirIndex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index 1dbf207e4d3..6056a7d8c60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -71,7 +71,15 @@ import org.mockito.invocation.InvocationOnMock; /** * This test simulates a variety of situations when blocks are being * intentionally corrupted, unexpectedly modified, and so on before a block - * report is happening + * report is happening. + * + * For each test case it runs two variations: + * #1 - For a given DN, the first variation sends block reports for all + * storages in a single call to the NN. + * #2 - For a given DN, the second variation sends block reports for each + * storage in a separate call. + * + * The behavior should be the same in either variation. */ public class TestBlockReport { public static final Log LOG = LogFactory.getLog(TestBlockReport.class); @@ -157,6 +165,113 @@ public class TestBlockReport { return reports; } + /** + * Utility routine to send block reports to the NN, either in a single call + * or reporting one storage per call. + * + * @param dnR + * @param poolId + * @param reports + * @param needtoSplit + * @throws IOException + */ + private void sendBlockReports(DatanodeRegistration dnR, String poolId, + StorageBlockReport[] reports, boolean needtoSplit) throws IOException { + if (!needtoSplit) { + LOG.info("Sending combined block reports for " + dnR); + cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + } else { + for (StorageBlockReport report : reports) { + LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); + StorageBlockReport[] singletonReport = { report }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport); + } + } + } + + /** + * Test variations blockReport_01 through blockReport_09 with combined + * and split block reports. + */ + @Test + public void blockReportCombined_01() throws IOException { + blockReport_01(false); + } + + @Test + public void blockReportSplit_01() throws IOException { + blockReport_01(true); + } + + @Test + public void blockReportCombined_02() throws IOException { + blockReport_02(false); + } + + @Test + public void blockReportSplit_02() throws IOException { + blockReport_02(true); + } + + @Test + public void blockReportCombined_03() throws IOException { + blockReport_03(false); + } + + @Test + public void blockReportSplit_03() throws IOException { + blockReport_03(true); + } + + @Test + public void blockReportCombined_04() throws IOException { + blockReport_04(false); + } + + @Test + public void blockReportSplit_04() throws IOException { + blockReport_04(true); + } + + @Test + public void blockReportCombined_06() throws Exception { + blockReport_06(false); + } + + @Test + public void blockReportSplit_06() throws Exception { + blockReport_06(true); + } + + @Test + public void blockReportCombined_07() throws Exception { + blockReport_07(false); + } + + @Test + public void blockReportSplit_07() throws Exception { + blockReport_07(true); + } + + @Test + public void blockReportCombined_08() throws Exception { + blockReport_08(false); + } + + @Test + public void blockReportSplit_08() throws Exception { + blockReport_08(true); + } + + @Test + public void blockReportCombined_09() throws Exception { + blockReport_09(false); + } + + @Test + public void blockReportSplit_09() throws Exception { + blockReport_09(true); + } /** * Test write a file, verifies and closes it. Then the length of the blocks * are messed up and BlockReport is forced. @@ -164,8 +279,7 @@ public class TestBlockReport { * * @throws java.io.IOException on an error */ - @Test - public void blockReport_01() throws IOException { + private void blockReport_01(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); @@ -198,7 +312,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); List blocksAfterReport = DFSTestUtil.getAllBlocks(fs.open(filePath)); @@ -224,8 +338,7 @@ public class TestBlockReport { * * @throws IOException in case of errors */ - @Test - public void blockReport_02() throws IOException { + private void blockReport_02(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); LOG.info("Running test " + METHOD_NAME); @@ -280,7 +393,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() .getBlockManager()); @@ -301,8 +414,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_03() throws IOException { + private void blockReport_03(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); ArrayList blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); @@ -312,11 +424,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - DatanodeCommand dnCmd = - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); - if(LOG.isDebugEnabled()) { - LOG.debug("Got the command: " + dnCmd); - } + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -333,8 +441,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_04() throws IOException { + private void blockReport_04(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, @@ -352,11 +459,7 @@ public class TestBlockReport { DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - DatanodeCommand dnCmd = - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); - if(LOG.isDebugEnabled()) { - LOG.debug("Got the command: " + dnCmd); - } + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -373,8 +476,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_06() throws Exception { + private void blockReport_06(boolean splitBlockReports) throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -387,7 +489,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication Blocks", 0, cluster.getNamesystem().getUnderReplicatedBlocks()); @@ -406,8 +508,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_07() throws Exception { + private void blockReport_07(boolean splitBlockReports) throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -421,7 +522,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -432,7 +533,7 @@ public class TestBlockReport { cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); reports = getBlockReports(dn, poolId, true, true); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -458,8 +559,7 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - @Test - public void blockReport_08() throws IOException { + private void blockReport_08(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -483,8 +583,8 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - StorageBlockReport[] report = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, report); + StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication blocks", blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); @@ -500,8 +600,7 @@ public class TestBlockReport { // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's // replica block. Expect the same behaviour: NN should simply ignore this // block - @Test - public void blockReport_09() throws IOException { + private void blockReport_09(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -526,8 +625,8 @@ public class TestBlockReport { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - StorageBlockReport[] report = getBlockReports(dn, poolId, true, true); - cluster.getNameNodeRpc().blockReport(dnR, poolId, report); + StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java new file mode 100644 index 00000000000..babda2f900d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This test verifies that incremental block reports from a single DataNode are + * correctly handled by NN. Tests the following variations: + * #1 - Incremental BRs from all storages combined in a single call. + * #2 - Incremental BRs from separate storages sent in separate calls. + * + * We also verify that the DataNode is not splitting the reports (it may do so + * in the future). + */ +public class TestIncrementalBrVariations { + public static final Log LOG = LogFactory.getLog(TestIncrementalBrVariations.class); + + private static short NUM_DATANODES = 1; + static final int BLOCK_SIZE = 1024; + static final int NUM_BLOCKS = 10; + private static final long seed = 0xFACEFEEDL; + private static final String NN_METRICS = "NameNodeActivity"; + + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private DFSClient client; + private static Configuration conf; + + static { + ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger) BlockManager.blockLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); + ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) TestIncrementalBrVariations.LOG).getLogger().setLevel(Level.ALL); + } + + @Before + public void startUpCluster() throws IOException { + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + fs = cluster.getFileSystem(); + client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), + cluster.getConfiguration(0)); + } + + @After + public void shutDownCluster() throws IOException { + client.close(); + fs.close(); + cluster.shutdownDataNodes(); + cluster.shutdown(); + } + + /** + * Incremental BRs from all storages combined in a single message. + */ + @Test + public void testCombinedIncrementalBlockReport() throws IOException { + verifyIncrementalBlockReports(false); + } + + /** + * One incremental BR per storage. + */ + @Test + public void testSplitIncrementalBlockReport() throws IOException { + verifyIncrementalBlockReports(true); + } + + private LocatedBlocks createFileGetBlocks(String filenamePrefix) throws IOException { + Path filePath = new Path("/" + filenamePrefix + ".dat"); + + // Write out a file with a few blocks, get block locations. + DFSTestUtil.createFile(fs, filePath, BLOCK_SIZE, BLOCK_SIZE * NUM_BLOCKS, + BLOCK_SIZE, NUM_DATANODES, seed); + + // Get the block list for the file with the block locations. + LocatedBlocks blocks = client.getLocatedBlocks( + filePath.toString(), 0, BLOCK_SIZE * NUM_BLOCKS); + assertThat(cluster.getNamesystem().getUnderReplicatedBlocks(), is(0L)); + return blocks; + } + + public void verifyIncrementalBlockReports(boolean splitReports) throws IOException { + // Get the block list for the file with the block locations. + LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName()); + + // A blocks belong to the same file, hence same BP + DataNode dn = cluster.getDataNodes().get(0); + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + + // We will send 'fake' incremental block reports to the NN that look + // like they originated from DN 0. + StorageReceivedDeletedBlocks reports[] = + new StorageReceivedDeletedBlocks[dn.getFSDataset().getVolumes().size()]; + + // Lie to the NN that one block on each storage has been deleted. + for (int i = 0; i < reports.length; ++i) { + FsVolumeSpi volume = dn.getFSDataset().getVolumes().get(i); + + boolean foundBlockOnStorage = false; + ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1]; + + // Find the first block on this storage and mark it as deleted for the + // report. + for (LocatedBlock block : blocks.getLocatedBlocks()) { + if (block.getStorageIDs()[0].equals(volume.getStorageID())) { + rdbi[0] = new ReceivedDeletedBlockInfo(block.getBlock().getLocalBlock(), + ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null); + foundBlockOnStorage = true; + break; + } + } + + assertTrue(foundBlockOnStorage); + reports[i] = new StorageReceivedDeletedBlocks(volume.getStorageID(), rdbi); + + if (splitReports) { + // If we are splitting reports then send the report for this storage now. + StorageReceivedDeletedBlocks singletonReport[] = { reports[i] }; + cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, singletonReport); + } + } + + if (!splitReports) { + // Send a combined report. + cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, reports); + } + + // Make sure that the deleted block from each storage was picked up + // by the NameNode. + assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length)); + } + + /** + * Verify that the DataNode sends a single incremental block report for all + * storages. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=60000) + public void testDataNodeDoesNotSplitReports() + throws IOException, InterruptedException { + LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName()); + assertThat(cluster.getDataNodes().size(), is(1)); + DataNode dn = cluster.getDataNodes().get(0); + + // Remove all blocks from the DataNode. + for (LocatedBlock block : blocks.getLocatedBlocks()) { + dn.notifyNamenodeDeletedBlock( + block.getBlock(), block.getStorageIDs()[0]); + } + + LOG.info("Triggering report after deleting blocks"); + long ops = getLongCounter("BlockReceivedAndDeletedOps", getMetrics(NN_METRICS)); + + // Trigger a report to the NameNode and give it a few seconds. + DataNodeTestUtils.triggerBlockReport(dn); + Thread.sleep(5000); + + // Ensure that NameNodeRpcServer.blockReceivedAndDeletes is invoked + // exactly once after we triggered the report. + assertCounter("BlockReceivedAndDeletedOps", ops+1, getMetrics(NN_METRICS)); + } +}