From ae4143a529d74d94f205ca627c31360abfa11bfa Mon Sep 17 00:00:00 2001 From: Santosh Marella Date: Fri, 14 Jun 2019 10:35:33 -0700 Subject: [PATCH] HDFS-12914. Block report leases cause missing blocks until next report. Contributed by Santosh Marella, He Xiaoqiao. Signed-off-by: Wei-Chiu Chuang Co-authored-by: He Xiaoqiao --- .../server/blockmanagement/BlockManager.java | 21 ++++++++---- .../server/namenode/NameNodeRpcServer.java | 34 +++++++++++-------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 2947b727305..8b9788a6fc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2572,6 +2572,21 @@ public class BlockManager implements BlockStatsMXBean { } } + /** + * Check block report lease. + * @return true if lease exist and not expire + */ + public boolean checkBlockReportLease(BlockReportContext context, + final DatanodeID nodeID) throws UnregisteredNodeException { + if (context == null) { + return true; + } + DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); + final long startTime = Time.monotonicNow(); + return blockReportLeaseManager.checkLease(node, startTime, + context.getLeaseId()); + } + /** * The given storage is reporting all its blocks. * Update the (storage{@literal -->}block list) and @@ -2619,12 +2634,6 @@ public class BlockManager implements BlockStatsMXBean { blockReportLeaseManager.removeLease(node); return !node.hasStaleStorages(); } - if (context != null) { - if (!blockReportLeaseManager.checkLease(node, startTime, - context.getLeaseId())) { - return false; - } - } if (storageInfo.getBlockReportCount() == 0) { // The first block report can be processed a lot more efficiently than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 7a2a81cdf3c..31a5eb0b41a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -45,7 +45,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import com.google.common.collect.Lists; @@ -175,6 +174,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; @@ -1591,21 +1591,25 @@ public class NameNodeRpcServer implements NamenodeProtocols { } final BlockManager bm = namesystem.getBlockManager(); boolean noStaleStorages = false; - for (int r = 0; r < reports.length; r++) { - final BlockListAsLongs blocks = reports[r].getBlocks(); - // - // BlockManager.processReport accumulates information of prior calls - // for the same node and storage, so the value returned by the last - // call of this loop is the final updated value for noStaleStorage. - // - final int index = r; - noStaleStorages = bm.runBlockOp(new Callable() { - @Override - public Boolean call() throws IOException { - return bm.processReport(nodeReg, reports[index].getStorage(), - blocks, context); + try { + if (bm.checkBlockReportLease(context, nodeReg)) { + for (int r = 0; r < reports.length; r++) { + final BlockListAsLongs blocks = reports[r].getBlocks(); + // + // BlockManager.processReport accumulates information of prior calls + // for the same node and storage, so the value returned by the last + // call of this loop is the final updated value for noStaleStorage. + // + final int index = r; + noStaleStorages = bm.runBlockOp(() -> + bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context)); } - }); + } + } catch (UnregisteredNodeException une) { + LOG.debug("Datanode {} is attempting to report but not register yet.", + nodeReg); + return RegisterCommand.REGISTER; } bm.removeBRLeaseIfNeeded(nodeReg, context);