From 8ed6a8a5b27e5e6c2d069cd28ecd5d70392caa03 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Fri, 31 Jan 2014 21:30:07 +0000 Subject: [PATCH] HDFS-5153. Merging r1563254 from trunk to branch-2. (Arpit Agarwal) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1563261 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../server/blockmanagement/BlockManager.java | 29 ++- .../blockmanagement/DatanodeDescriptor.java | 11 + .../hdfs/server/datanode/BPServiceActor.java | 169 ++++++------ .../hadoop/hdfs/server/datanode/DNConf.java | 5 + .../server/namenode/NameNodeRpcServer.java | 9 +- .../src/main/resources/hdfs-default.xml | 14 + ...ckReport.java => BlockReportTestBase.java} | 240 ++++++------------ ...stDnRespectsBlockReportSplitThreshold.java | 205 +++++++++++++++ .../TestNNHandlesBlockReportPerStorage.java | 42 +++ .../TestNNHandlesCombinedBlockReport.java | 39 +++ 12 files changed, 518 insertions(+), 250 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/{TestBlockReport.java => BlockReportTestBase.java} (85%) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ab4d27c66ff..8296f85e040 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -11,6 +11,9 @@ Release 2.4.0 - UNRELEASED HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and the corresponding byte value. (jing9) + HDFS-5153. Datanode should send block reports for each storage in a + separate message. (Arpit Agarwal) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7c9c5cd2628..90a5833ba15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -399,6 +399,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000; public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay"; public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0; + public static final String DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold"; + public static final long DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000; public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec"; public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000; public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ee59d829b13..b59104fb4c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1621,15 +1621,19 @@ public class BlockManager { /** * The given storage is reporting all its blocks. * Update the (storage-->block list) and (block-->storage list) maps. + * + * @return true if all known storages of the given DN have finished reporting. + * @throws IOException */ - public void processReport(final DatanodeID nodeID, + public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, final String poolId, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock final long endTime; + DatanodeDescriptor node; try { - final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); + node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isAlive) { throw new IOException( "ProcessReport from dead or unregistered node: " + nodeID); @@ -1637,13 +1641,21 @@ public class BlockManager { // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. - final DatanodeStorageInfo storageInfo = node.updateStorage(storage); + DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID()); + + if (storageInfo == null) { + // We handle this for backwards compatibility. + storageInfo = node.updateStorage(storage); + LOG.warn("Unknown storageId " + storage.getStorageID() + + ", updating storageMap. This indicates a buggy " + + "DataNode that isn't heartbeating correctly."); + } if (namesystem.isInStartupSafeMode() && storageInfo.getBlockReportCount() > 0) { blockLog.info("BLOCK* processReport: " + "discarded non-initial block report from " + nodeID + " because namenode still in startup phase"); - return; + return !node.hasStaleStorages(); } if (storageInfo.numBlocks() == 0) { @@ -1660,7 +1672,7 @@ public class BlockManager { storageInfo.receivedBlockReport(); if (staleBefore && !storageInfo.areBlockContentsStale()) { LOG.info("BLOCK* processReport: Received first block report from " - + node + " after starting up or becoming active. Its block " + + storage + " after starting up or becoming active. Its block " + "contents are no longer considered stale"); rescanPostponedMisreplicatedBlocks(); } @@ -1675,9 +1687,10 @@ public class BlockManager { if (metrics != null) { metrics.addBlockReport((int) (endTime - startTime)); } - blockLog.info("BLOCK* processReport: from " - + nodeID + ", blocks: " + newReport.getNumberOfBlocks() + blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID() + + " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks() + ", processing time: " + (endTime - startTime) + " msecs"); + return !node.hasStaleStorages(); } /** @@ -1832,7 +1845,7 @@ public class BlockManager { Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list - final DatanodeStorageInfo storageInfo = dn.updateStorage(storage); + final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID()); // place a delimiter in the list which separates blocks // that have been reported from those that have not diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 607db6f7b84..0a8391b8b15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -259,6 +259,17 @@ public class DatanodeDescriptor extends DatanodeInfo { } } + boolean hasStaleStorages() { + synchronized (storageMap) { + for (DatanodeStorageInfo storage : storageMap.values()) { + if (storage.areBlockContentsStale()) { + return true; + } + } + return false; + } + } + /** * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index c91fca381fa..a1c2cb2f50c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -22,11 +22,9 @@ import static org.apache.hadoop.util.Time.now; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; +import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -437,75 +435,100 @@ class BPServiceActor implements Runnable { /** * Report the list blocks to the Namenode + * @return DatanodeCommands returned by the NN. May be null. * @throws IOException */ - DatanodeCommand blockReport() throws IOException { + List blockReport() throws IOException { // send block report if timer has expired. - DatanodeCommand cmd = null; - long startTime = now(); - if (startTime - lastBlockReport > dnConf.blockReportInterval) { - - // Flush any block information that precedes the block report. Otherwise - // we have a chance that we will miss the delHint information - // or we will report an RBW replica after the BlockReport already reports - // a FINALIZED one. - reportReceivedDeletedBlocks(); - - // Send one block report per known storage. - - // Create block report - long brCreateStartTime = now(); - long totalBlockCount = 0; - - Map perVolumeBlockLists = - dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); - - // Send block report - long brSendStartTime = now(); - StorageBlockReport[] reports = - new StorageBlockReport[perVolumeBlockLists.size()]; - - int i = 0; - for(Map.Entry kvPair : perVolumeBlockLists.entrySet()) { - DatanodeStorage dnStorage = kvPair.getKey(); - BlockListAsLongs blockList = kvPair.getValue(); - totalBlockCount += blockList.getNumberOfBlocks(); - - reports[i++] = - new StorageBlockReport( - dnStorage, blockList.getBlockListAsLongs()); - } - - cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports); - - // Log the block report processing stats from Datanode perspective - long brSendCost = now() - brSendStartTime; - long brCreateCost = brSendStartTime - brCreateStartTime; - dn.getMetrics().addBlockReport(brSendCost); - LOG.info("BlockReport of " + totalBlockCount - + " blocks took " + brCreateCost + " msec to generate and " - + brSendCost + " msecs for RPC and NN processing"); - - // If we have sent the first block report, then wait a random - // time before we start the periodic block reports. - if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); - resetBlockReportTime = false; - } else { - /* say the last block report was at 8:20:14. The current report - * should have started around 9:20:14 (default 1 hour interval). - * If current time is : - * 1) normal like 9:20:18, next report should be at 10:20:14 - * 2) unexpected like 11:35:43, next report should be at 12:20:14 - */ - lastBlockReport += (now() - lastBlockReport) / - dnConf.blockReportInterval * dnConf.blockReportInterval; - } - LOG.info("sent block report, processed command:" + cmd); + final long startTime = now(); + if (startTime - lastBlockReport <= dnConf.blockReportInterval) { + return null; } - return cmd; + + ArrayList cmds = new ArrayList(); + + // Flush any block information that precedes the block report. Otherwise + // we have a chance that we will miss the delHint information + // or we will report an RBW replica after the BlockReport already reports + // a FINALIZED one. + reportReceivedDeletedBlocks(); + lastDeletedReport = startTime; + + long brCreateStartTime = now(); + Map perVolumeBlockLists = + dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); + + // Convert the reports to the format expected by the NN. + int i = 0; + int totalBlockCount = 0; + StorageBlockReport reports[] = + new StorageBlockReport[perVolumeBlockLists.size()]; + + for(Map.Entry kvPair : perVolumeBlockLists.entrySet()) { + BlockListAsLongs blockList = kvPair.getValue(); + reports[i++] = new StorageBlockReport( + kvPair.getKey(), blockList.getBlockListAsLongs()); + totalBlockCount += blockList.getNumberOfBlocks(); + } + + // Send the reports to the NN. + int numReportsSent; + long brSendStartTime = now(); + if (totalBlockCount < dnConf.blockReportSplitThreshold) { + // Below split threshold, send all reports in a single message. + numReportsSent = 1; + DatanodeCommand cmd = + bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports); + if (cmd != null) { + cmds.add(cmd); + } + } else { + // Send one block report per message. + numReportsSent = i; + for (StorageBlockReport report : reports) { + StorageBlockReport singleReport[] = { report }; + DatanodeCommand cmd = bpNamenode.blockReport( + bpRegistration, bpos.getBlockPoolId(), singleReport); + if (cmd != null) { + cmds.add(cmd); + } + } + } + + // Log the block report processing stats from Datanode perspective + long brSendCost = now() - brSendStartTime; + long brCreateCost = brSendStartTime - brCreateStartTime; + dn.getMetrics().addBlockReport(brSendCost); + LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount + + " blocks total. Took " + brCreateCost + + " msec to generate and " + brSendCost + + " msecs for RPC and NN processing. " + + " Got back commands " + + (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds))); + + scheduleNextBlockReport(startTime); + return cmds.size() == 0 ? null : cmds; } - + + private void scheduleNextBlockReport(long previousReportStartTime) { + // If we have sent the first set of block reports, then wait a random + // time before we start the periodic block reports. + if (resetBlockReportTime) { + lastBlockReport = previousReportStartTime - + DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); + resetBlockReportTime = false; + } else { + /* say the last block report was at 8:20:14. The current report + * should have started around 9:20:14 (default 1 hour interval). + * If current time is : + * 1) normal like 9:20:18, next report should be at 10:20:14 + * 2) unexpected like 11:35:43, next report should be at 12:20:14 + */ + lastBlockReport += (now() - lastBlockReport) / + dnConf.blockReportInterval * dnConf.blockReportInterval; + } + } + DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report if (dn.getFSDataset().getCacheCapacity() == 0) { @@ -513,7 +536,7 @@ class BPServiceActor implements Runnable { } // send cache report if timer has expired. DatanodeCommand cmd = null; - long startTime = Time.monotonicNow(); + final long startTime = Time.monotonicNow(); if (startTime - lastCacheReport > dnConf.cacheReportInterval) { if (LOG.isDebugEnabled()) { LOG.debug("Sending cacheReport from service actor: " + this); @@ -613,7 +636,7 @@ class BPServiceActor implements Runnable { // while (shouldRun()) { try { - long startTime = now(); + final long startTime = now(); // // Every so often, send heartbeat or block-report @@ -659,10 +682,10 @@ class BPServiceActor implements Runnable { lastDeletedReport = startTime; } - DatanodeCommand cmd = blockReport(); - processCommand(new DatanodeCommand[]{ cmd }); + List cmds = blockReport(); + processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); - cmd = cacheReport(); + DatanodeCommand cmd = cacheReport(); processCommand(new DatanodeCommand[]{ cmd }); // Now safe to start scanning the block pool. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 5d7afc7ecfd..73f3661182f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; @@ -70,6 +72,7 @@ public class DNConf { final long readaheadLength; final long heartBeatInterval; final long blockReportInterval; + final long blockReportSplitThreshold; final long deleteReportInterval; final long initialBlockReportDelay; final long cacheReportInterval; @@ -117,6 +120,8 @@ public class DNConf { DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, + DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0f641e48785..3eecc104c6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -978,13 +978,18 @@ class NameNodeRpcServer implements NamenodeProtocols { + "from " + nodeReg + ", reports.length=" + reports.length); } final BlockManager bm = namesystem.getBlockManager(); + boolean hasStaleStorages = true; for(StorageBlockReport r : reports) { final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks()); - bm.processReport(nodeReg, r.getStorage(), poolId, blocks); + hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks); } - if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState()) + if (nn.getFSImage().isUpgradeFinalized() && + !nn.isStandbyState() && + !hasStaleStorages) { return new FinalizeCommand(poolId); + } + return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 24f0b03c0b1..b0019ccb61d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -482,6 +482,20 @@ Delay for first block report in seconds. + + dfs.blockreport.split.threshold + 1000000 + If the number of blocks on the DataNode is below this + threshold then it will send block reports for all Storage Directories + in a single message. + + If the number of blocks exceeds this threshold then the DataNode will + send block reports for each Storage Directory in separate messages. + + Set to zero to always split. + + + dfs.datanode.directoryscan.interval 21600 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java similarity index 85% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 6056a7d8c60..bf74b321c64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -47,12 +47,11 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -69,20 +68,16 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; /** - * This test simulates a variety of situations when blocks are being - * intentionally corrupted, unexpectedly modified, and so on before a block - * report is happening. + * This is the base class for simulating a variety of situations + * when blocks are being intentionally corrupted, unexpectedly modified, + * and so on before a block report is happening. * - * For each test case it runs two variations: - * #1 - For a given DN, the first variation sends block reports for all - * storages in a single call to the NN. - * #2 - For a given DN, the second variation sends block reports for each - * storage in a separate call. - * - * The behavior should be the same in either variation. + * By overriding {@link #sendBlockReports}, derived classes can test + * different variations of how block reports are split across storages + * and messages. */ -public class TestBlockReport { - public static final Log LOG = LogFactory.getLog(TestBlockReport.class); +public abstract class BlockReportTestBase { + public static final Log LOG = LogFactory.getLog(BlockReportTestBase.class); private static short REPL_FACTOR = 1; private static final int RAND_LIMIT = 2000; @@ -91,12 +86,11 @@ public class TestBlockReport { private static final int DN_N0 = 0; private static final int FILE_START = 0; - static final int BLOCK_SIZE = 1024; - static final int NUM_BLOCKS = 10; - static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1; - static String bpid; + private static final int BLOCK_SIZE = 1024; + private static final int NUM_BLOCKS = 10; + private static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1; - private MiniDFSCluster cluster; + protected MiniDFSCluster cluster; private DistributedFileSystem fs; private static Random rand = new Random(RAND_LIMIT); @@ -112,8 +106,7 @@ public class TestBlockReport { public void startUpCluster() throws IOException { REPL_FACTOR = 1; //Reset if case a test has modified the value cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build(); - fs = (DistributedFileSystem) cluster.getFileSystem(); - bpid = cluster.getNamesystem().getBlockPoolId(); + fs = cluster.getFileSystem(); } @After @@ -123,6 +116,15 @@ public class TestBlockReport { cluster.shutdown(); } + protected static void resetConfiguration() { + conf = new Configuration(); + int customPerChecksumSize = 512; + int customBlockSize = customPerChecksumSize * 3; + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); + conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL); + } + // Generate a block report, optionally corrupting the generation // stamp and/or length of one block. private static StorageBlockReport[] getBlockReports( @@ -172,106 +174,11 @@ public class TestBlockReport { * @param dnR * @param poolId * @param reports - * @param needtoSplit * @throws IOException */ - private void sendBlockReports(DatanodeRegistration dnR, String poolId, - StorageBlockReport[] reports, boolean needtoSplit) throws IOException { - if (!needtoSplit) { - LOG.info("Sending combined block reports for " + dnR); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); - } else { - for (StorageBlockReport report : reports) { - LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); - StorageBlockReport[] singletonReport = { report }; - cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport); - } - } - } + protected abstract void sendBlockReports(DatanodeRegistration dnR, String poolId, + StorageBlockReport[] reports) throws IOException; - /** - * Test variations blockReport_01 through blockReport_09 with combined - * and split block reports. - */ - @Test - public void blockReportCombined_01() throws IOException { - blockReport_01(false); - } - - @Test - public void blockReportSplit_01() throws IOException { - blockReport_01(true); - } - - @Test - public void blockReportCombined_02() throws IOException { - blockReport_02(false); - } - - @Test - public void blockReportSplit_02() throws IOException { - blockReport_02(true); - } - - @Test - public void blockReportCombined_03() throws IOException { - blockReport_03(false); - } - - @Test - public void blockReportSplit_03() throws IOException { - blockReport_03(true); - } - - @Test - public void blockReportCombined_04() throws IOException { - blockReport_04(false); - } - - @Test - public void blockReportSplit_04() throws IOException { - blockReport_04(true); - } - - @Test - public void blockReportCombined_06() throws Exception { - blockReport_06(false); - } - - @Test - public void blockReportSplit_06() throws Exception { - blockReport_06(true); - } - - @Test - public void blockReportCombined_07() throws Exception { - blockReport_07(false); - } - - @Test - public void blockReportSplit_07() throws Exception { - blockReport_07(true); - } - - @Test - public void blockReportCombined_08() throws Exception { - blockReport_08(false); - } - - @Test - public void blockReportSplit_08() throws Exception { - blockReport_08(true); - } - - @Test - public void blockReportCombined_09() throws Exception { - blockReport_09(false); - } - - @Test - public void blockReportSplit_09() throws Exception { - blockReport_09(true); - } /** * Test write a file, verifies and closes it. Then the length of the blocks * are messed up and BlockReport is forced. @@ -279,7 +186,8 @@ public class TestBlockReport { * * @throws java.io.IOException on an error */ - private void blockReport_01(boolean splitBlockReports) throws IOException { + @Test(timeout=300000) + public void blockReport_01() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); @@ -312,7 +220,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); List blocksAfterReport = DFSTestUtil.getAllBlocks(fs.open(filePath)); @@ -338,7 +246,8 @@ public class TestBlockReport { * * @throws IOException in case of errors */ - private void blockReport_02(boolean splitBlockReports) throws IOException { + @Test(timeout=300000) + public void blockReport_02() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); LOG.info("Running test " + METHOD_NAME); @@ -393,7 +302,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() .getBlockManager()); @@ -414,17 +323,18 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - private void blockReport_03(boolean splitBlockReports) throws IOException { + @Test(timeout=300000) + public void blockReport_03() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); ArrayList blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); - + // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -441,7 +351,8 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - private void blockReport_04(boolean splitBlockReports) throws IOException { + @Test(timeout=300000) + public void blockReport_04() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, @@ -459,7 +370,7 @@ public class TestBlockReport { DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -476,7 +387,8 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - private void blockReport_06(boolean splitBlockReports) throws Exception { + @Test(timeout=300000) + public void blockReport_06() throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -489,7 +401,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); printStats(); assertEquals("Wrong number of PendingReplication Blocks", 0, cluster.getNamesystem().getUnderReplicatedBlocks()); @@ -508,7 +420,8 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - private void blockReport_07(boolean splitBlockReports) throws Exception { + @Test(timeout=300000) + public void blockReport_07() throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -522,7 +435,7 @@ public class TestBlockReport { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -533,7 +446,7 @@ public class TestBlockReport { cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); reports = getBlockReports(dn, poolId, true, true); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -559,7 +472,8 @@ public class TestBlockReport { * * @throws IOException in case of an error */ - private void blockReport_08(boolean splitBlockReports) throws IOException { + @Test(timeout=300000) + public void blockReport_08() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -578,13 +492,13 @@ public class TestBlockReport { bc.start(); waitForTempReplica(bl, DN_N1); - + // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); printStats(); assertEquals("Wrong number of PendingReplication blocks", blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); @@ -600,7 +514,8 @@ public class TestBlockReport { // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's // replica block. Expect the same behaviour: NN should simply ignore this // block - private void blockReport_09(boolean splitBlockReports) throws IOException { + @Test(timeout=300000) + public void blockReport_09() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -620,17 +535,17 @@ public class TestBlockReport { bc.start(); waitForTempReplica(bl, DN_N1); - + // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true); - sendBlockReports(dnR, poolId, reports, splitBlockReports); + sendBlockReports(dnR, poolId, reports); printStats(); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks()); - + try { bc.join(); } catch (InterruptedException e) {} @@ -638,7 +553,7 @@ public class TestBlockReport { resetConfiguration(); // return the initial state of the configuration } } - + /** * Test for the case where one of the DNs in the pipeline is in the * process of doing a block report exactly when the block is closed. @@ -648,7 +563,7 @@ public class TestBlockReport { * corrupt. * This is a regression test for HDFS-2791. */ - @Test + @Test(timeout=300000) public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception { final CountDownLatch brFinished = new CountDownLatch(1); DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) { @@ -663,7 +578,7 @@ public class TestBlockReport { } } }; - + final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); @@ -671,9 +586,9 @@ public class TestBlockReport { // what happens when one of the DNs is slowed for some reason. REPL_FACTOR = 2; startDNandWait(null, false); - + NameNode nn = cluster.getNameNode(); - + FSDataOutputStream out = fs.create(filePath, REPL_FACTOR); try { AppendTestUtil.write(out, 0, 10); @@ -682,21 +597,21 @@ public class TestBlockReport { // Set up a spy so that we can delay the block report coming // from this node. DataNode dn = cluster.getDataNodes().get(0); - DatanodeProtocol spy = + DatanodeProtocolClientSideTranslatorPB spy = DataNodeTestUtils.spyOnBposToNN(dn, nn); - + Mockito.doAnswer(delayer) .when(spy).blockReport( Mockito.anyObject(), Mockito.anyString(), Mockito.anyObject()); - + // Force a block report to be generated. The block report will have // an RBW replica in it. Wait for the RPC to be sent, but block // it before it gets to the NN. dn.scheduleAllBlockReport(0); delayer.waitForCall(); - + } finally { IOUtils.closeStream(out); } @@ -705,22 +620,22 @@ public class TestBlockReport { // state. delayer.proceed(); brFinished.await(); - + // Verify that no replicas are marked corrupt, and that the // file is still readable. BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager()); assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks()); DFSTestUtil.readFile(fs, filePath); - + // Ensure that the file is readable even from the DN that we futzed with. cluster.stopDataNode(1); - DFSTestUtil.readFile(fs, filePath); + DFSTestUtil.readFile(fs, filePath); } private void waitForTempReplica(Block bl, int DN_N1) throws IOException { final boolean tooLongWait = false; final int TIMEOUT = 40000; - + if(LOG.isDebugEnabled()) { LOG.debug("Wait for datanode " + DN_N1 + " to appear"); } @@ -731,7 +646,7 @@ public class TestBlockReport { LOG.debug("Total number of DNs " + cluster.getDataNodes().size()); } cluster.waitActive(); - + // Look about specified DN for the replica of the block from 1st DN final DataNode dn1 = cluster.getDataNodes().get(DN_N1); String bpid = cluster.getNamesystem().getBlockPoolId(); @@ -789,7 +704,7 @@ public class TestBlockReport { return blocks; } - private void startDNandWait(Path filePath, boolean waitReplicas) + private void startDNandWait(Path filePath, boolean waitReplicas) throws IOException, InterruptedException, TimeoutException { if (LOG.isDebugEnabled()) { LOG.debug("Before next DN start: " + cluster.getDataNodes().size()); @@ -802,7 +717,7 @@ public class TestBlockReport { if (LOG.isDebugEnabled()) { int lastDn = datanodes.size() - 1; LOG.debug("New datanode " - + cluster.getDataNodes().get(lastDn).getDisplayName() + + cluster.getDataNodes().get(lastDn).getDisplayName() + " has been started"); } if (waitReplicas) { @@ -898,7 +813,7 @@ public class TestBlockReport { ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); ((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger) TestBlockReport.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) BlockReportTestBase.LOG).getLogger().setLevel(Level.ALL); } private Block findBlock(Path path, long size) throws IOException { @@ -918,11 +833,11 @@ public class TestBlockReport { private class BlockChecker extends Thread { Path filePath; - + public BlockChecker(final Path filePath) { this.filePath = filePath; } - + @Override public void run() { try { @@ -933,13 +848,4 @@ public class TestBlockReport { } } } - - private static void resetConfiguration() { - conf = new Configuration(); - int customPerChecksumSize = 512; - int customBlockSize = customPerChecksumSize * 3; - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); - conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java new file mode 100644 index 00000000000..989c33d2f09 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; +import org.apache.hadoop.test.GenericTestUtils; + +import org.junit.After; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.times; + +/** + * Tests that the DataNode respects + * {@link DFSConfigKeys#DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY} + */ +public class TestDnRespectsBlockReportSplitThreshold { + public static final Log LOG = LogFactory.getLog(TestStorageReport.class); + + private static final int BLOCK_SIZE = 1024; + private static final short REPL_FACTOR = 1; + private static final long seed = 0xFEEDFACE; + private static final int BLOCKS_IN_FILE = 5; + + private static Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + static String bpid; + + public void startUpCluster(long splitThreshold) throws IOException { + conf = new HdfsConfiguration(); + conf.setLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, splitThreshold); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(REPL_FACTOR) + .build(); + fs = cluster.getFileSystem(); + bpid = cluster.getNamesystem().getBlockPoolId(); + } + + @After + public void shutDownCluster() throws IOException { + if (cluster != null) { + fs.close(); + cluster.shutdown(); + cluster = null; + } + } + + private void createFile(String filenamePrefix, int blockCount) + throws IOException { + Path path = new Path("/" + filenamePrefix + ".dat"); + DFSTestUtil.createFile(fs, path, BLOCK_SIZE, + blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed); + } + + private void verifyCapturedArguments( + ArgumentCaptor captor, + int expectedReportsPerCall, + int expectedTotalBlockCount) { + + List listOfReports = captor.getAllValues(); + int numBlocksReported = 0; + for (StorageBlockReport[] reports : listOfReports) { + assertThat(reports.length, is(expectedReportsPerCall)); + + for (StorageBlockReport report : reports) { + BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks()); + numBlocksReported += blockList.getNumberOfBlocks(); + } + } + + assert(numBlocksReported >= expectedTotalBlockCount); + } + + /** + * Test that if splitThreshold is zero, then we always get a separate + * call per storage. + */ + @Test(timeout=300000) + public void testAlwaysSplit() throws IOException, InterruptedException { + startUpCluster(0); + NameNode nn = cluster.getNameNode(); + DataNode dn = cluster.getDataNodes().get(0); + + // Create a file with a few blocks. + createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE); + + // Insert a spy object for the NN RPC. + DatanodeProtocolClientSideTranslatorPB nnSpy = + DataNodeTestUtils.spyOnBposToNN(dn, nn); + + // Trigger a block report so there is an interaction with the spy + // object. + DataNodeTestUtils.triggerBlockReport(dn); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(StorageBlockReport[].class); + + Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport( + any(DatanodeRegistration.class), + anyString(), + captor.capture()); + + verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); + } + + /** + * Tests the behavior when the count of blocks is exactly one less than + * the threshold. + */ + @Test(timeout=300000) + public void testCornerCaseUnderThreshold() throws IOException, InterruptedException { + startUpCluster(BLOCKS_IN_FILE + 1); + NameNode nn = cluster.getNameNode(); + DataNode dn = cluster.getDataNodes().get(0); + + // Create a file with a few blocks. + createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE); + + // Insert a spy object for the NN RPC. + DatanodeProtocolClientSideTranslatorPB nnSpy = + DataNodeTestUtils.spyOnBposToNN(dn, nn); + + // Trigger a block report so there is an interaction with the spy + // object. + DataNodeTestUtils.triggerBlockReport(dn); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(StorageBlockReport[].class); + + Mockito.verify(nnSpy, times(1)).blockReport( + any(DatanodeRegistration.class), + anyString(), + captor.capture()); + + verifyCapturedArguments(captor, MiniDFSCluster.DIRS_PER_DATANODE, BLOCKS_IN_FILE); + } + + /** + * Tests the behavior when the count of blocks is exactly equal to the + * threshold. + */ + @Test(timeout=300000) + public void testCornerCaseAtThreshold() throws IOException, InterruptedException { + startUpCluster(BLOCKS_IN_FILE); + NameNode nn = cluster.getNameNode(); + DataNode dn = cluster.getDataNodes().get(0); + + // Create a file with a few blocks. + createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE); + + // Insert a spy object for the NN RPC. + DatanodeProtocolClientSideTranslatorPB nnSpy = + DataNodeTestUtils.spyOnBposToNN(dn, nn); + + // Trigger a block report so there is an interaction with the spy + // object. + DataNodeTestUtils.triggerBlockReport(dn); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(StorageBlockReport[].class); + + Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport( + any(DatanodeRegistration.class), + anyString(), + captor.capture()); + + verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java new file mode 100644 index 00000000000..1b03786fa39 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; + + +/** + * Runs all tests in BlockReportTestBase, sending one block per storage. + * This is the default DataNode behavior post HDFS-2832. + */ +public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase { + + @Override + protected void sendBlockReports(DatanodeRegistration dnR, String poolId, + StorageBlockReport[] reports) throws IOException { + for (StorageBlockReport report : reports) { + LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); + StorageBlockReport[] singletonReport = { report }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java new file mode 100644 index 00000000000..036b550c668 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; + +/** + * Runs all tests in BlockReportTestBase, sending one block report + * per DataNode. This tests that the NN can handle the legacy DN + * behavior where it presents itself as a single logical storage. + */ +public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase { + + @Override + protected void sendBlockReports(DatanodeRegistration dnR, String poolId, + StorageBlockReport[] reports) throws IOException { + LOG.info("Sending combined block reports for " + dnR); + cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + } +}