HDFS-7440. Consolidate snapshot related operations in a single class. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-11-25 21:03:41 -08:00
parent a655973e78
commit 4a31611829
4 changed files with 279 additions and 166 deletions

View File

@ -395,6 +395,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7436. Consolidate implementation of concat(). (wheat9) HDFS-7436. Consolidate implementation of concat(). (wheat9)
HDFS-7440. Consolidate snapshot related operations in a single class.
(wheat9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
import org.apache.hadoop.hdfs.util.ChunkedArrayList;
import java.io.IOException;
import java.util.List;
class FSDirSnapshotOp {
/** Allow snapshot on a directory. */
static void allowSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
String path) throws IOException {
fsd.writeLock();
try {
snapshotManager.setSnapshottable(path, true);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logAllowSnapshot(path);
}
static void disallowSnapshot(
FSDirectory fsd, SnapshotManager snapshotManager,
String path) throws IOException {
fsd.writeLock();
try {
snapshotManager.resetSnapshottable(path);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logDisallowSnapshot(path);
}
/**
* Create a snapshot
* @param snapshotRoot The directory path where the snapshot is taken
* @param snapshotName The name of the snapshot
*/
static String createSnapshot(
FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
String snapshotName, boolean logRetryCache)
throws IOException {
final FSPermissionChecker pc = fsd.getPermissionChecker();
String snapshotPath = null;
if (fsd.isPermissionEnabled()) {
fsd.checkOwner(pc, snapshotRoot);
}
if (snapshotName == null || snapshotName.isEmpty()) {
snapshotName = Snapshot.generateDefaultSnapshotName();
}
if(snapshotName != null){
if (!DFSUtil.isValidNameForComponent(snapshotName)) {
throw new InvalidPathException("Invalid snapshot name: " +
snapshotName);
}
}
fsd.verifySnapshotName(snapshotName, snapshotRoot);
fsd.writeLock();
try {
snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
logRetryCache);
return snapshotPath;
}
static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
String path, String snapshotOldName, String snapshotNewName,
boolean logRetryCache) throws IOException {
if (fsd.isPermissionEnabled()) {
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.checkOwner(pc, path);
}
fsd.verifySnapshotName(snapshotNewName, path);
fsd.writeLock();
try {
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logRenameSnapshot(path, snapshotOldName,
snapshotNewName, logRetryCache);
}
static SnapshottableDirectoryStatus[] getSnapshottableDirListing(
FSDirectory fsd, SnapshotManager snapshotManager) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.readLock();
try {
final String user = pc.isSuperUser()? null : pc.getUser();
return snapshotManager.getSnapshottableDirListing(user);
} finally {
fsd.readUnlock();
}
}
static SnapshotDiffReport getSnapshotDiffReport(
FSDirectory fsd, SnapshotManager snapshotManager, String path,
String fromSnapshot, String toSnapshot) throws IOException {
SnapshotDiffReport diffs;
final FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.readLock();
try {
if (fsd.isPermissionEnabled()) {
checkSubtreeReadPermission(fsd, pc, path, fromSnapshot);
checkSubtreeReadPermission(fsd, pc, path, toSnapshot);
}
diffs = snapshotManager.diff(path, fromSnapshot, toSnapshot);
} finally {
fsd.readUnlock();
}
return diffs;
}
/**
* Delete a snapshot of a snapshottable directory
* @param snapshotRoot The snapshottable directory
* @param snapshotName The name of the to-be-deleted snapshot
* @throws IOException
*/
static INode.BlocksMapUpdateInfo deleteSnapshot(
FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
String snapshotName, boolean logRetryCache)
throws IOException {
final FSPermissionChecker pc = fsd.getPermissionChecker();
INode.BlocksMapUpdateInfo collectedBlocks = new INode.BlocksMapUpdateInfo();
if (fsd.isPermissionEnabled()) {
fsd.checkOwner(pc, snapshotRoot);
}
ChunkedArrayList<INode> removedINodes = new ChunkedArrayList<INode>();
fsd.writeLock();
try {
snapshotManager.deleteSnapshot(snapshotRoot, snapshotName,
collectedBlocks, removedINodes);
fsd.removeFromInodeMap(removedINodes);
} finally {
fsd.writeUnlock();
}
removedINodes.clear();
fsd.getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
logRetryCache);
return collectedBlocks;
}
private static void checkSubtreeReadPermission(
FSDirectory fsd, final FSPermissionChecker pc, String snapshottablePath,
String snapshot) throws IOException {
final String fromPath = snapshot == null ?
snapshottablePath : Snapshot.getSnapshotPath(snapshottablePath,
snapshot);
fsd.checkPermission(pc, fromPath, false, null, null, FsAction.READ,
FsAction.READ);
}
/**
* Check if the given INode (or one of its descendants) is snapshottable and
* already has snapshots.
*
* @param target The given INode
* @param snapshottableDirs The list of directories that are snapshottable
* but do not have snapshots yet
*/
static void checkSnapshot(
INode target, List<INodeDirectory> snapshottableDirs)
throws SnapshotException {
if (target.isDirectory()) {
INodeDirectory targetDir = target.asDirectory();
DirectorySnapshottableFeature sf = targetDir
.getDirectorySnapshottableFeature();
if (sf != null) {
if (sf.getNumSnapshots() > 0) {
String fullPath = targetDir.getFullPathName();
throw new SnapshotException("The directory " + fullPath
+ " cannot be deleted since " + fullPath
+ " is snapshottable and already has snapshots");
} else {
if (snapshottableDirs != null) {
snapshottableDirs.add(targetDir);
}
}
}
for (INode child : targetDir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
checkSnapshot(child, snapshottableDirs);
}
}
}
}

View File

@ -686,7 +686,7 @@ public class FSDirectory implements Closeable {
List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>(); List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
if (dstInode != null) { // Destination exists if (dstInode != null) { // Destination exists
validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode); validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
checkSnapshot(dstInode, snapshottableDirs); FSDirSnapshotOp.checkSnapshot(dstInode, snapshottableDirs);
} }
INode dstParent = dstIIP.getINode(-2); INode dstParent = dstIIP.getINode(-2);
@ -863,7 +863,7 @@ public class FSDirectory implements Closeable {
} }
// srcInode and its subtree cannot contain snapshottable directories with // srcInode and its subtree cannot contain snapshottable directories with
// snapshots // snapshots
checkSnapshot(srcInode, null); FSDirSnapshotOp.checkSnapshot(srcInode, null);
} }
/** /**
@ -1208,7 +1208,7 @@ public class FSDirectory implements Closeable {
filesRemoved = -1; filesRemoved = -1;
} else { } else {
List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>(); List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs); FSDirSnapshotOp.checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks, filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
removedINodes, mtime); removedINodes, mtime);
namesystem.removeSnapshottableDirs(snapshottableDirs); namesystem.removeSnapshottableDirs(snapshottableDirs);
@ -1278,7 +1278,7 @@ public class FSDirectory implements Closeable {
long filesRemoved = -1; long filesRemoved = -1;
if (deleteAllowed(inodesInPath, src)) { if (deleteAllowed(inodesInPath, src)) {
List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>(); List<INodeDirectory> snapshottableDirs = new ArrayList<INodeDirectory>();
checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs); FSDirSnapshotOp.checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks, filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
removedINodes, mtime); removedINodes, mtime);
namesystem.removeSnapshottableDirs(snapshottableDirs); namesystem.removeSnapshottableDirs(snapshottableDirs);
@ -1342,38 +1342,6 @@ public class FSDirectory implements Closeable {
} }
return removed; return removed;
} }
/**
* Check if the given INode (or one of its descendants) is snapshottable and
* already has snapshots.
*
* @param target The given INode
* @param snapshottableDirs The list of directories that are snapshottable
* but do not have snapshots yet
*/
private static void checkSnapshot(INode target,
List<INodeDirectory> snapshottableDirs) throws SnapshotException {
if (target.isDirectory()) {
INodeDirectory targetDir = target.asDirectory();
DirectorySnapshottableFeature sf = targetDir
.getDirectorySnapshottableFeature();
if (sf != null) {
if (sf.getNumSnapshots() > 0) {
String fullPath = targetDir.getFullPathName();
throw new SnapshotException("The directory " + fullPath
+ " cannot be deleted since " + fullPath
+ " is snapshottable and already has snapshots");
} else {
if (snapshottableDirs != null) {
snapshottableDirs.add(targetDir);
}
}
}
for (INode child : targetDir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
checkSnapshot(child, snapshottableDirs);
}
}
}
private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy : return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
@ -3321,10 +3289,10 @@ public class FSDirectory implements Closeable {
* details of the parameters, see * details of the parameters, see
* {@link FSPermissionChecker#checkPermission}. * {@link FSPermissionChecker#checkPermission}.
*/ */
private void checkPermission( void checkPermission(
FSPermissionChecker pc, String path, boolean doCheckOwner, FSPermissionChecker pc, String path, boolean doCheckOwner,
FsAction ancestorAccess, FsAction parentAccess, FsAction access, FsAction ancestorAccess, FsAction parentAccess, FsAction access,
FsAction subAccess) FsAction subAccess)
throws AccessControlException, UnresolvedLinkException { throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, doCheckOwner, ancestorAccess, checkPermission(pc, path, doCheckOwner, ancestorAccess,
parentAccess, access, subAccess, false, true); parentAccess, access, subAccess, false, true);

View File

@ -7594,55 +7594,39 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
/** Allow snapshot on a directory. */ /** Allow snapshot on a directory. */
void allowSnapshot(String path) throws SafeModeException, IOException { void allowSnapshot(String path) throws IOException {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
boolean success = false;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot allow snapshot for " + path); checkNameNodeSafeMode("Cannot allow snapshot for " + path);
checkSuperuserPrivilege(); checkSuperuserPrivilege();
FSDirSnapshotOp.allowSnapshot(dir, snapshotManager, path);
dir.writeLock(); success = true;
try {
snapshotManager.setSnapshottable(path, true);
} finally {
dir.writeUnlock();
}
getEditLog().logAllowSnapshot(path);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(success, "allowSnapshot", path, null, null);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "allowSnapshot", path, null, null);
}
} }
/** Disallow snapshot on a directory. */ /** Disallow snapshot on a directory. */
void disallowSnapshot(String path) throws SafeModeException, IOException { void disallowSnapshot(String path) throws IOException {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
boolean success = false;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot disallow snapshot for " + path); checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
checkSuperuserPrivilege(); checkSuperuserPrivilege();
FSDirSnapshotOp.disallowSnapshot(dir, snapshotManager, path);
dir.writeLock(); success = true;
try {
snapshotManager.resetSnapshottable(path);
} finally {
dir.writeUnlock();
}
getEditLog().logDisallowSnapshot(path);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(success, "disallowSnapshot", path, null, null);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "disallowSnapshot", path, null, null);
}
} }
/** /**
@ -7651,45 +7635,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param snapshotName The name of the snapshot * @param snapshotName The name of the snapshot
*/ */
String createSnapshot(String snapshotRoot, String snapshotName, String createSnapshot(String snapshotRoot, String snapshotName,
boolean logRetryCache) boolean logRetryCache) throws IOException {
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
String snapshotPath = null; String snapshotPath = null;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot); checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
if (isPermissionEnabled) { snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
checkOwner(pc, snapshotRoot); snapshotManager, snapshotRoot, snapshotName, logRetryCache);
}
if (snapshotName == null || snapshotName.isEmpty()) {
snapshotName = Snapshot.generateDefaultSnapshotName();
}
if(snapshotName != null){
if (!DFSUtil.isValidNameForComponent(snapshotName)) {
throw new InvalidPathException("Invalid snapshot name: "
+ snapshotName);
}
}
dir.verifySnapshotName(snapshotName, snapshotRoot);
dir.writeLock();
try {
snapshotPath = snapshotManager.createSnapshot(snapshotRoot, snapshotName);
} finally {
dir.writeUnlock();
}
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName, logRetryCache);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(snapshotPath != null, "createSnapshot", snapshotRoot,
if (auditLog.isInfoEnabled() && isExternalInvocation()) { snapshotPath, null);
logAuditEvent(true, "createSnapshot", snapshotRoot, snapshotPath, null);
}
return snapshotPath; return snapshotPath;
} }
@ -7705,32 +7664,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String path, String snapshotOldName, String snapshotNewName, String path, String snapshotOldName, String snapshotNewName,
boolean logRetryCache) throws IOException { boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker(); boolean success = false;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename snapshot for " + path); checkNameNodeSafeMode("Cannot rename snapshot for " + path);
if (isPermissionEnabled) { FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
checkOwner(pc, path); snapshotOldName, snapshotNewName, logRetryCache);
} success = true;
dir.verifySnapshotName(snapshotNewName, path);
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
logRetryCache);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
if (auditLog.isInfoEnabled() && isExternalInvocation()) { String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName); logAuditEvent(success, "renameSnapshot", oldSnapshotRoot,
String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName); newSnapshotRoot, null);
logAuditEvent(true, "renameSnapshot", oldSnapshotRoot, newSnapshotRoot, null);
}
} }
/** /**
@ -7744,18 +7693,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException { throws IOException {
SnapshottableDirectoryStatus[] status = null; SnapshottableDirectoryStatus[] status = null;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
final FSPermissionChecker checker = getPermissionChecker(); boolean success = false;
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
final String user = checker.isSuperUser()? null : checker.getUser(); status = FSDirSnapshotOp.getSnapshottableDirListing(dir, snapshotManager);
status = snapshotManager.getSnapshottableDirListing(user); success = true;
} finally { } finally {
readUnlock(); readUnlock();
} }
if (auditLog.isInfoEnabled() && isExternalInvocation()) { logAuditEvent(success, "listSnapshottableDirectory", null, null, null);
logAuditEvent(true, "listSnapshottableDirectory", null, null, null);
}
return status; return status;
} }
@ -7776,35 +7723,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/ */
SnapshotDiffReport getSnapshotDiffReport(String path, SnapshotDiffReport getSnapshotDiffReport(String path,
String fromSnapshot, String toSnapshot) throws IOException { String fromSnapshot, String toSnapshot) throws IOException {
SnapshotDiffReport diffs; SnapshotDiffReport diffs = null;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
if (isPermissionEnabled) { diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
checkSubtreeReadPermission(pc, path, fromSnapshot); path, fromSnapshot, toSnapshot);
checkSubtreeReadPermission(pc, path, toSnapshot);
}
diffs = snapshotManager.diff(path, fromSnapshot, toSnapshot);
} finally { } finally {
readUnlock(); readUnlock();
} }
if (auditLog.isInfoEnabled() && isExternalInvocation()) { logAuditEvent(diffs != null, "computeSnapshotDiff", null, null, null);
logAuditEvent(true, "computeSnapshotDiff", null, null, null);
}
return diffs; return diffs;
} }
private void checkSubtreeReadPermission(final FSPermissionChecker pc,
final String snapshottablePath, final String snapshot)
throws AccessControlException, UnresolvedLinkException {
final String fromPath = snapshot == null?
snapshottablePath: Snapshot.getSnapshotPath(snapshottablePath, snapshot);
checkPermission(pc, fromPath, false, null, null, FsAction.READ, FsAction.READ);
}
/** /**
* Delete a snapshot of a snapshottable directory * Delete a snapshot of a snapshottable directory
* @param snapshotRoot The snapshottable directory * @param snapshotRoot The snapshottable directory
@ -7812,45 +7745,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws SafeModeException * @throws SafeModeException
* @throws IOException * @throws IOException
*/ */
void deleteSnapshot(String snapshotRoot, String snapshotName, void deleteSnapshot(
boolean logRetryCache) String snapshotRoot, String snapshotName, boolean logRetryCache)
throws SafeModeException, IOException { throws IOException {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker(); boolean success = false;
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
writeLock(); writeLock();
BlocksMapUpdateInfo blocksToBeDeleted = null;
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot); checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
if (isPermissionEnabled) { blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
checkOwner(pc, snapshotRoot); snapshotRoot, snapshotName, logRetryCache);
} success = true;
List<INode> removedINodes = new ChunkedArrayList<INode>();
dir.writeLock();
try {
snapshotManager.deleteSnapshot(snapshotRoot, snapshotName,
collectedBlocks, removedINodes);
dir.removeFromInodeMap(removedINodes);
} finally {
dir.writeUnlock();
}
removedINodes.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName, logRetryCache);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
removeBlocks(collectedBlocks); // Breaking the pattern as removing blocks have to happen outside of the
collectedBlocks.clear(); // global lock
if (blocksToBeDeleted != null) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) { removeBlocks(blocksToBeDeleted);
String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
logAuditEvent(true, "deleteSnapshot", rootPath, null, null);
} }
String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
logAuditEvent(success, "deleteSnapshot", rootPath, null, null);
} }
/** /**