From 6822193ee6d6ac8b08822fa76c89e1dd61c5ddca Mon Sep 17 00:00:00 2001 From: Santosh Marella Date: Fri, 14 Jun 2019 10:35:33 -0700 Subject: [PATCH] HDFS-12914. Block report leases cause missing blocks until next report. Contributed by Santosh Marella, He Xiaoqiao. Signed-off-by: Wei-Chiu Chuang Co-authored-by: He Xiaoqiao --- .../server/blockmanagement/BlockManager.java | 21 ++- .../server/namenode/NameNodeRpcServer.java | 34 ++-- .../blockmanagement/TestBlockReportLease.java | 159 ++++++++++++++++++ 3 files changed, 193 insertions(+), 21 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 2947b727305..8b9788a6fc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2572,6 +2572,21 @@ public class BlockManager implements BlockStatsMXBean { } } + /** + * Check block report lease. + * @return true if lease exist and not expire + */ + public boolean checkBlockReportLease(BlockReportContext context, + final DatanodeID nodeID) throws UnregisteredNodeException { + if (context == null) { + return true; + } + DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); + final long startTime = Time.monotonicNow(); + return blockReportLeaseManager.checkLease(node, startTime, + context.getLeaseId()); + } + /** * The given storage is reporting all its blocks. * Update the (storage{@literal -->}block list) and @@ -2619,12 +2634,6 @@ public class BlockManager implements BlockStatsMXBean { blockReportLeaseManager.removeLease(node); return !node.hasStaleStorages(); } - if (context != null) { - if (!blockReportLeaseManager.checkLease(node, startTime, - context.getLeaseId())) { - return false; - } - } if (storageInfo.getBlockReportCount() == 0) { // The first block report can be processed a lot more efficiently than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 7a2a81cdf3c..31a5eb0b41a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -45,7 +45,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import com.google.common.collect.Lists; @@ -175,6 +174,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; @@ -1591,21 +1591,25 @@ public class NameNodeRpcServer implements NamenodeProtocols { } final BlockManager bm = namesystem.getBlockManager(); boolean noStaleStorages = false; - for (int r = 0; r < reports.length; r++) { - final BlockListAsLongs blocks = reports[r].getBlocks(); - // - // BlockManager.processReport accumulates information of prior calls - // for the same node and storage, so the value returned by the last - // call of this loop is the final updated value for noStaleStorage. - // - final int index = r; - noStaleStorages = bm.runBlockOp(new Callable() { - @Override - public Boolean call() throws IOException { - return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context); + try { + if (bm.checkBlockReportLease(context, nodeReg)) { + for (int r = 0; r < reports.length; r++) { + final BlockListAsLongs blocks = reports[r].getBlocks(); + // + // BlockManager.processReport accumulates information of prior calls + // for the same node and storage, so the value returned by the last + // call of this loop is the final updated value for noStaleStorage. + // + final int index = r; + noStaleStorages = bm.runBlockOp(() -> + bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context)); } - }); + } + } catch (UnregisteredNodeException une) { + LOG.debug("Datanode {} is attempting to report but not register yet.", + nodeReg); + return RegisterCommand.REGISTER; } bm.removeBRLeaseIfNeeded(nodeReg, context); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java new file mode 100644 index 00000000000..40408b19244 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +/** + * Tests that BlockReportLease in BlockManager. + */ +public class TestBlockReportLease { + + /** + * Test check lease about one BlockReport with many StorageBlockReport. + * Before HDFS-12914, when batch storage report to NameNode, it will check + * less for one storage by one, So it could part storage report can + * be process normally, however, the rest storage report can not be process + * since check lease failed. + * After HDFS-12914, NameNode check lease once for every blockreport request, + * So this issue will not exist anymore. + */ + @Test + public void testCheckBlockReportLease() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + Random rand = new Random(); + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + cluster.waitActive(); + + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager blockManager = fsn.getBlockManager(); + BlockManager spyBlockManager = spy(blockManager); + fsn.setBlockManagerForTesting(spyBlockManager); + String poolId = cluster.getNamesystem().getBlockPoolId(); + + NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); + + // Test based on one DataNode report to Namenode + DataNode dn = cluster.getDataNodes().get(0); + DatanodeDescriptor datanodeDescriptor = spyBlockManager + .getDatanodeManager().getDatanode(dn.getDatanodeId()); + + DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId); + StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId); + + // Send heartbeat and request full block report lease + HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + + DelayAnswer delayer = new DelayAnswer(BlockManager.LOG); + doAnswer(delayer).when(spyBlockManager).processReport( + any(DatanodeStorageInfo.class), + any(BlockListAsLongs.class), + any(BlockReportContext.class)); + + ExecutorService pool = Executors.newFixedThreadPool(1); + + // Trigger sendBlockReport + BlockReportContext brContext = new BlockReportContext(1, 0, + rand.nextLong(), hbResponse.getFullBlockReportLeaseId(), true); + Future sendBRfuturea = pool.submit(() -> { + // Build every storage with 100 blocks for sending report + DatanodeStorage[] datanodeStorages + = new DatanodeStorage[storages.length]; + for (int i = 0; i < storages.length; i++) { + datanodeStorages[i] = storages[i].getStorage(); + } + StorageBlockReport[] reports = createReports(datanodeStorages, 100); + + // Send blockReport + return rpcServer.blockReport(dnRegistration, poolId, reports, + brContext); + }); + + // Wait until BlockManager calls processReport + delayer.waitForCall(); + + // Remove full block report lease about dn + spyBlockManager.getBlockReportLeaseManager() + .removeLease(datanodeDescriptor); + + // Allow blockreport to proceed + delayer.proceed(); + + // Get result, it will not null if process successfully + DatanodeCommand datanodeCommand = sendBRfuturea.get(); + assertTrue(datanodeCommand instanceof FinalizeCommand); + assertEquals(poolId, ((FinalizeCommand)datanodeCommand) + .getBlockPoolId()); + } + } + + private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages, + int numBlocks) { + int longsPerBlock = 3; + int blockListSize = 2 + numBlocks * longsPerBlock; + int numStorages = dnStorages.length; + StorageBlockReport[] storageBlockReports + = new StorageBlockReport[numStorages]; + for (int i = 0; i < numStorages; i++) { + List longs = new ArrayList(blockListSize); + longs.add(Long.valueOf(numBlocks)); + longs.add(0L); + for (int j = 0; j < blockListSize; ++j) { + longs.add(Long.valueOf(j)); + } + BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs); + storageBlockReports[i] = new StorageBlockReport(dnStorages[i], blockList); + } + return storageBlockReports; + } +}