diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index fad811bc5d8..2589cafc595 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2132,6 +2132,21 @@ public class BlockManager implements BlockStatsMXBean { } } + /** + * Check block report lease. + * @return true if lease exist and not expire + */ + public boolean checkBlockReportLease(BlockReportContext context, + final DatanodeID nodeID) throws UnregisteredNodeException { + if (context == null) { + return true; + } + DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); + final long startTime = Time.monotonicNow(); + return blockReportLeaseManager.checkLease(node, startTime, + context.getLeaseId()); + } + /** * The given storage is reporting all its blocks. * Update the (storage-->block list) and (block-->storage list) maps. @@ -2175,12 +2190,6 @@ public class BlockManager implements BlockStatsMXBean { blockReportLeaseManager.removeLease(node); return !node.hasStaleStorages(); } - if (context != null) { - if (!blockReportLeaseManager.checkLease(node, startTime, - context.getLeaseId())) { - return false; - } - } if (storageInfo.getBlockReportCount() == 0) { // The first block report can be processed a lot more efficiently than @@ -2289,7 +2298,7 @@ public class BlockManager implements BlockStatsMXBean { } } - private Collection processReport( + Collection processReport( final DatanodeStorageInfo storageInfo, final BlockListAsLongs report, BlockReportContext context) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 52e2bbf72a4..992704bc6ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -425,7 +425,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** The namespace tree. */ FSDirectory dir; - private final BlockManager blockManager; + private BlockManager blockManager; private final SnapshotManager snapshotManager; private final CacheManager cacheManager; private final DatanodeStatistics datanodeStatistics; @@ -5815,6 +5815,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return blockManager; } + @VisibleForTesting + public void setBlockManagerForTesting(BlockManager bm) { + this.blockManager = bm; + } + /** @return the FSDirectory. */ @Override public FSDirectory getFSDirectory() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e6d03bb0c46..830320aae36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -153,6 +153,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; @@ -1449,22 +1450,30 @@ public class NameNodeRpcServer implements NamenodeProtocols { } final BlockManager bm = namesystem.getBlockManager(); boolean noStaleStorages = false; - for (int r = 0; r < reports.length; r++) { - final BlockListAsLongs blocks = reports[r].getBlocks(); - // - // BlockManager.processReport accumulates information of prior calls - // for the same node and storage, so the value returned by the last - // call of this loop is the final updated value for noStaleStorage. - // - final int index = r; - noStaleStorages = bm.runBlockOp(new Callable() { - @Override - public Boolean call() throws IOException { - return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context); + try { + if (bm.checkBlockReportLease(context, nodeReg)) { + for (int r = 0; r < reports.length; r++) { + final BlockListAsLongs blocks = reports[r].getBlocks(); + // + // BlockManager.processReport accumulates information of prior calls + // for the same node and storage, so the value returned by the last + // call of this loop is the final updated value for noStaleStorage. + // + final int index = r; + noStaleStorages = bm.runBlockOp(new Callable() { + @Override + public Boolean call() throws IOException { + return bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context); + } + }); } - }); + } metrics.incrStorageBlockReportOps(); + } catch (UnregisteredNodeException une) { + LOG.debug("Datanode {} is attempting to report but not register yet.", + nodeReg); + return RegisterCommand.REGISTER; } bm.removeBRLeaseIfNeeded(nodeReg, context); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java new file mode 100644 index 00000000000..a4f16b99c6f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +/** + * Tests that BlockReportLease in BlockManager. + */ +public class TestBlockReportLease { + private static final Log LOG = LogFactory.getLog(TestBlockReportLease.class); + /** + * Test check lease about one BlockReport with many StorageBlockReport. + * Before HDFS-12914, when batch storage report to NameNode, it will check + * less for one storage by one, So it could part storage report can + * be process normally, however, the rest storage report can not be process + * since check lease failed. + * After HDFS-12914, NameNode check lease once for every blockreport request, + * So this issue will not exist anymore. + */ + @Test + public void testCheckBlockReportLease() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + Random rand = new Random(); + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + cluster.waitActive(); + + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager blockManager = fsn.getBlockManager(); + BlockManager spyBlockManager = spy(blockManager); + fsn.setBlockManagerForTesting(spyBlockManager); + final String poolId = cluster.getNamesystem().getBlockPoolId(); + + final NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); + + // Test based on one DataNode report to Namenode + DataNode dn = cluster.getDataNodes().get(0); + DatanodeDescriptor datanodeDescriptor = spyBlockManager + .getDatanodeManager().getDatanode(dn.getDatanodeId()); + + final DatanodeRegistration dnRegistration = + dn.getDNRegistrationForBP(poolId); + final StorageReport[] storages = dn.getFSDataset(). + getStorageReports(poolId); + + // Send heartbeat and request full block report lease + HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + + DelayAnswer delayer = new DelayAnswer(LOG); + doAnswer(delayer).when(spyBlockManager).processReport( + any(DatanodeStorageInfo.class), + any(BlockListAsLongs.class), + any(BlockReportContext.class)); + + ExecutorService pool = Executors.newFixedThreadPool(1); + + // Trigger sendBlockReport + final BlockReportContext brContext = new BlockReportContext(1, 0, + rand.nextLong(), hbResponse.getFullBlockReportLeaseId()); + Future sendBRfuturea = pool.submit( + new Callable() { + @Override + public DatanodeCommand call() throws Exception { + // Build every storage with 100 blocks for sending report + DatanodeStorage[] datanodeStorages + = new DatanodeStorage[storages.length]; + for (int i = 0; i < storages.length; i++) { + datanodeStorages[i] = storages[i].getStorage(); + } + StorageBlockReport[] reports = createReports(datanodeStorages, + 100); + + // Send blockReport + return rpcServer.blockReport(dnRegistration, poolId, reports, + brContext); + } + }); + + // Wait until BlockManager calls processReport + delayer.waitForCall(); + + // Remove full block report lease about dn + spyBlockManager.getBlockReportLeaseManager() + .removeLease(datanodeDescriptor); + + // Allow blockreport to proceed + delayer.proceed(); + + // Get result, it will not null if process successfully + DatanodeCommand datanodeCommand = sendBRfuturea.get(); + assertTrue(datanodeCommand instanceof FinalizeCommand); + assertEquals(poolId, ((FinalizeCommand)datanodeCommand) + .getBlockPoolId()); + } + } + + private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages, + int numBlocks) { + int longsPerBlock = 3; + int blockListSize = 2 + numBlocks * longsPerBlock; + int numStorages = dnStorages.length; + StorageBlockReport[] storageBlockReports + = new StorageBlockReport[numStorages]; + for (int i = 0; i < numStorages; i++) { + List longs = new ArrayList(blockListSize); + longs.add(Long.valueOf(numBlocks)); + longs.add(0L); + for (int j = 0; j < blockListSize; ++j) { + longs.add(Long.valueOf(j)); + } + BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs); + storageBlockReports[i] = new StorageBlockReport(dnStorages[i], blockList); + } + return storageBlockReports; + } +} \ No newline at end of file