HDFS-16931. Observer nn delete blocks asynchronously when tail OP_DELETE editlog
This commit is contained in:
parent
11a220c6e7
commit
4c6b9cdb12
|
@ -2757,14 +2757,16 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
* Removes the blocks from blocksmap and updates the safemode blocks total.
|
* Removes the blocks from blocksmap and updates the safemode blocks total.
|
||||||
* @param blocks An instance of {@link BlocksMapUpdateInfo} which contains a
|
* @param blocks An instance of {@link BlocksMapUpdateInfo} which contains a
|
||||||
* list of blocks that need to be removed from blocksMap
|
* list of blocks that need to be removed from blocksMap
|
||||||
|
* @param asnyc whether to delete a block asynchronously
|
||||||
*/
|
*/
|
||||||
public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
|
public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks,boolean deleteAsync) {
|
||||||
assert namesystem.hasWriteLock();
|
assert namesystem.hasWriteLock();
|
||||||
// In the case that we are a Standby tailing edits from the
|
// In the case that we are a Standby tailing edits from the
|
||||||
// active while in safe-mode, we need to track the total number
|
// active while in safe-mode, we need to track the total number
|
||||||
// of blocks and safe blocks in the system.
|
// of blocks and safe blocks in the system.
|
||||||
boolean trackBlockCounts = bmSafeMode.isSafeModeTrackingBlocks();
|
boolean trackBlockCounts = bmSafeMode.isSafeModeTrackingBlocks();
|
||||||
int numRemovedComplete = 0, numRemovedSafe = 0;
|
int numRemovedComplete = 0, numRemovedSafe = 0;
|
||||||
|
deleteAsync = deleteAsync && !trackBlockCounts;
|
||||||
|
|
||||||
for (BlockInfo b : blocks.getToDeleteList()) {
|
for (BlockInfo b : blocks.getToDeleteList()) {
|
||||||
if (trackBlockCounts) {
|
if (trackBlockCounts) {
|
||||||
|
@ -2775,8 +2777,15 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
removeBlock(b);
|
if (!deleteAsync){
|
||||||
|
removeBlock(b);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// delete block async
|
||||||
|
if (deleteAsync) {
|
||||||
|
this.getMarkedDeleteQueue().add(blocks.getToDeleteList());
|
||||||
|
}
|
||||||
|
|
||||||
if (trackBlockCounts) {
|
if (trackBlockCounts) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Adjusting safe-mode totals for deletion."
|
LOG.debug("Adjusting safe-mode totals for deletion."
|
||||||
|
|
|
@ -127,8 +127,9 @@ class FSDirDeleteOp {
|
||||||
* @param fsd the FSDirectory instance
|
* @param fsd the FSDirectory instance
|
||||||
* @param iip inodes of a path to be deleted
|
* @param iip inodes of a path to be deleted
|
||||||
* @param mtime the time the inode is removed
|
* @param mtime the time the inode is removed
|
||||||
|
* @param asnyc whether to delete a block asynchronously
|
||||||
*/
|
*/
|
||||||
static void deleteForEditLog(FSDirectory fsd, INodesInPath iip, long mtime)
|
static void deleteForEditLog(FSDirectory fsd, INodesInPath iip, long mtime, boolean asnyc)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert fsd.hasWriteLock();
|
assert fsd.hasWriteLock();
|
||||||
FSNamesystem fsn = fsd.getFSNamesystem();
|
FSNamesystem fsn = fsd.getFSNamesystem();
|
||||||
|
@ -148,7 +149,7 @@ class FSDirDeleteOp {
|
||||||
if (filesRemoved) {
|
if (filesRemoved) {
|
||||||
fsn.removeSnapshottableDirs(snapshottableDirs);
|
fsn.removeSnapshottableDirs(snapshottableDirs);
|
||||||
fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false);
|
fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false);
|
||||||
fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
|
fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks,asnyc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -346,7 +346,7 @@ class FSDirRenameOp {
|
||||||
collectedBlocks, options);
|
collectedBlocks, options);
|
||||||
if (!collectedBlocks.getToDeleteList().isEmpty()) {
|
if (!collectedBlocks.getToDeleteList().isEmpty()) {
|
||||||
fsd.getFSNamesystem().getBlockManager()
|
fsd.getFSNamesystem().getBlockManager()
|
||||||
.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
|
.removeBlocksAndUpdateSafemodeTotal(collectedBlocks,false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -198,7 +198,7 @@ final class FSDirTruncateOp {
|
||||||
}
|
}
|
||||||
assert onBlockBoundary == (truncateBlock == null) :
|
assert onBlockBoundary == (truncateBlock == null) :
|
||||||
"truncateBlock is null iff on block boundary: " + truncateBlock;
|
"truncateBlock is null iff on block boundary: " + truncateBlock;
|
||||||
fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
|
fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks,false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -411,7 +411,7 @@ public class FSEditLogLoader {
|
||||||
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
|
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
|
||||||
if (oldFile != null && addCloseOp.overwrite) {
|
if (oldFile != null && addCloseOp.overwrite) {
|
||||||
// This is OP_ADD with overwrite
|
// This is OP_ADD with overwrite
|
||||||
FSDirDeleteOp.deleteForEditLog(fsDir, iip, addCloseOp.mtime);
|
FSDirDeleteOp.deleteForEditLog(fsDir, iip, addCloseOp.mtime,false);
|
||||||
iip = INodesInPath.replace(iip, iip.length() - 1, null);
|
iip = INodesInPath.replace(iip, iip.length() - 1, null);
|
||||||
oldFile = null;
|
oldFile = null;
|
||||||
}
|
}
|
||||||
|
@ -627,7 +627,7 @@ public class FSEditLogLoader {
|
||||||
final String src = renameReservedPathsOnUpgrade(
|
final String src = renameReservedPathsOnUpgrade(
|
||||||
deleteOp.path, logVersion);
|
deleteOp.path, logVersion);
|
||||||
final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE_LINK);
|
final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE_LINK);
|
||||||
FSDirDeleteOp.deleteForEditLog(fsDir, iip, deleteOp.timestamp);
|
FSDirDeleteOp.deleteForEditLog(fsDir, iip, deleteOp.timestamp,true);
|
||||||
|
|
||||||
if (toAddRetryCache) {
|
if (toAddRetryCache) {
|
||||||
fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
|
fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
|
||||||
|
@ -823,7 +823,7 @@ public class FSEditLogLoader {
|
||||||
new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
|
new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
|
||||||
collectedBlocks, removedINodes, null), deleteSnapshotOp.mtime);
|
collectedBlocks, removedINodes, null), deleteSnapshotOp.mtime);
|
||||||
fsNamesys.getBlockManager().removeBlocksAndUpdateSafemodeTotal(
|
fsNamesys.getBlockManager().removeBlocksAndUpdateSafemodeTotal(
|
||||||
collectedBlocks);
|
collectedBlocks,false);
|
||||||
collectedBlocks.clear();
|
collectedBlocks.clear();
|
||||||
fsNamesys.dir.removeFromInodeMap(removedINodes);
|
fsNamesys.dir.removeFromInodeMap(removedINodes);
|
||||||
removedINodes.clear();
|
removedINodes.clear();
|
||||||
|
|
Loading…
Reference in New Issue