HDFS-12914. Block report leases cause missing blocks until next report. Contributed by Santosh Marella, He Xiaoqiao.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> Co-authored-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
3ba090f436
commit
ae4143a529
|
@ -2572,6 +2572,21 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check block report lease.
|
||||||
|
* @return true if lease exist and not expire
|
||||||
|
*/
|
||||||
|
public boolean checkBlockReportLease(BlockReportContext context,
|
||||||
|
final DatanodeID nodeID) throws UnregisteredNodeException {
|
||||||
|
if (context == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
||||||
|
final long startTime = Time.monotonicNow();
|
||||||
|
return blockReportLeaseManager.checkLease(node, startTime,
|
||||||
|
context.getLeaseId());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The given storage is reporting all its blocks.
|
* The given storage is reporting all its blocks.
|
||||||
* Update the (storage{@literal -->}block list) and
|
* Update the (storage{@literal -->}block list) and
|
||||||
|
@ -2619,12 +2634,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
blockReportLeaseManager.removeLease(node);
|
blockReportLeaseManager.removeLease(node);
|
||||||
return !node.hasStaleStorages();
|
return !node.hasStaleStorages();
|
||||||
}
|
}
|
||||||
if (context != null) {
|
|
||||||
if (!blockReportLeaseManager.checkLease(node, startTime,
|
|
||||||
context.getLeaseId())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (storageInfo.getBlockReportCount() == 0) {
|
if (storageInfo.getBlockReportCount() == 0) {
|
||||||
// The first block report can be processed a lot more efficiently than
|
// The first block report can be processed a lot more efficiently than
|
||||||
|
|
|
@ -45,7 +45,6 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
@ -175,6 +174,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
|
||||||
|
@ -1591,21 +1591,25 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
final BlockManager bm = namesystem.getBlockManager();
|
final BlockManager bm = namesystem.getBlockManager();
|
||||||
boolean noStaleStorages = false;
|
boolean noStaleStorages = false;
|
||||||
for (int r = 0; r < reports.length; r++) {
|
try {
|
||||||
final BlockListAsLongs blocks = reports[r].getBlocks();
|
if (bm.checkBlockReportLease(context, nodeReg)) {
|
||||||
//
|
for (int r = 0; r < reports.length; r++) {
|
||||||
// BlockManager.processReport accumulates information of prior calls
|
final BlockListAsLongs blocks = reports[r].getBlocks();
|
||||||
// for the same node and storage, so the value returned by the last
|
//
|
||||||
// call of this loop is the final updated value for noStaleStorage.
|
// BlockManager.processReport accumulates information of prior calls
|
||||||
//
|
// for the same node and storage, so the value returned by the last
|
||||||
final int index = r;
|
// call of this loop is the final updated value for noStaleStorage.
|
||||||
noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
|
//
|
||||||
@Override
|
final int index = r;
|
||||||
public Boolean call() throws IOException {
|
noStaleStorages = bm.runBlockOp(() ->
|
||||||
return bm.processReport(nodeReg, reports[index].getStorage(),
|
bm.processReport(nodeReg, reports[index].getStorage(),
|
||||||
blocks, context);
|
blocks, context));
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
} catch (UnregisteredNodeException une) {
|
||||||
|
LOG.debug("Datanode {} is attempting to report but not register yet.",
|
||||||
|
nodeReg);
|
||||||
|
return RegisterCommand.REGISTER;
|
||||||
}
|
}
|
||||||
bm.removeBRLeaseIfNeeded(nodeReg, context);
|
bm.removeBRLeaseIfNeeded(nodeReg, context);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue