HDFS-7506. Consolidate implementation of setting inode attributes into a single class. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-12-15 10:40:33 -08:00
parent 6e13fc62e1
commit 832ebd8cb6
6 changed files with 572 additions and 524 deletions

View File

@ -590,6 +590,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7514. TestTextCommand fails on Windows. (Arpit Agarwal)
HDFS-7506. Consolidate implementation of setting inode attributes into a
single class. (wheat9)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,455 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.security.AccessControlException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
public class FSDirAttrOp {
static HdfsFileStatus setPermission(
FSDirectory fsd, final String srcArg, FsPermission permission)
throws IOException {
String src = srcArg;
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src, pathComponents);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
fsd.checkOwner(pc, iip);
unprotectedSetPermission(fsd, src, permission);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logSetPermissions(src, permission);
return fsd.getAuditFileInfo(src, false);
}
static HdfsFileStatus setOwner(
FSDirectory fsd, String src, String username, String group)
throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src, pathComponents);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
fsd.checkOwner(pc, iip);
if (!pc.isSuperUser()) {
if (username != null && !pc.getUser().equals(username)) {
throw new AccessControlException("Non-super user cannot change owner");
}
if (group != null && !pc.containsGroup(group)) {
throw new AccessControlException("User does not belong to " + group);
}
}
unprotectedSetOwner(fsd, src, username, group);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logSetOwner(src, username, group);
return fsd.getAuditFileInfo(src, false);
}
static HdfsFileStatus setTimes(
FSDirectory fsd, String src, long mtime, long atime)
throws IOException {
if (!fsd.isAccessTimeSupported() && atime != -1) {
throw new IOException(
"Access time for hdfs is not configured. " +
" Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY
+ " configuration parameter.");
}
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src, pathComponents);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
// Write access is required to set access and modification times
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
final INode inode = iip.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File/Directory " + src +
" does not exist.");
}
boolean changed = unprotectedSetTimes(fsd, inode, mtime, atime, true,
iip.getLatestSnapshotId());
if (changed) {
fsd.getEditLog().logTimes(src, mtime, atime);
}
} finally {
fsd.writeUnlock();
}
return fsd.getAuditFileInfo(src, false);
}
static boolean setReplication(
FSDirectory fsd, BlockManager bm, String src, final short replication)
throws IOException {
bm.verifyReplication(src, replication, null);
final boolean isFile;
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src, pathComponents);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
final short[] blockRepls = new short[2]; // 0: old, 1: new
final Block[] blocks = unprotectedSetReplication(fsd, src, replication,
blockRepls);
isFile = blocks != null;
if (isFile) {
fsd.getEditLog().logSetReplication(src, replication);
bm.setReplication(blockRepls[0], blockRepls[1], src, blocks);
}
} finally {
fsd.writeUnlock();
}
return isFile;
}
static HdfsFileStatus setStoragePolicy(
FSDirectory fsd, BlockManager bm, String src, final String policyName)
throws IOException {
if (!fsd.isStoragePolicyEnabled()) {
throw new IOException(
"Failed to set storage policy since "
+ DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
}
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
fsd.writeLock();
try {
src = FSDirectory.resolvePath(src, pathComponents, fsd);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
// get the corresponding policy and make sure the policy name is valid
BlockStoragePolicy policy = bm.getStoragePolicy(policyName);
if (policy == null) {
throw new HadoopIllegalArgumentException(
"Cannot find a block policy with the name " + policyName);
}
unprotectedSetStoragePolicy(fsd, bm, iip, policy.getId());
fsd.getEditLog().logSetStoragePolicy(src, policy.getId());
} finally {
fsd.writeUnlock();
}
return fsd.getAuditFileInfo(src, false);
}
static BlockStoragePolicy[] getStoragePolicies(BlockManager bm)
throws IOException {
return bm.getStoragePolicies();
}
static long getPreferredBlockSize(FSDirectory fsd, String src)
throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
fsd.readLock();
try {
src = fsd.resolvePath(pc, src, pathComponents);
final INodesInPath iip = fsd.getINodesInPath(src, false);
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, iip);
}
return INodeFile.valueOf(iip.getLastINode(), src)
.getPreferredBlockSize();
} finally {
fsd.readUnlock();
}
}
/**
* Set the namespace quota and diskspace quota for a directory.
*
* Note: This does not support ".inodes" relative path.
*/
static void setQuota(FSDirectory fsd, String src, long nsQuota, long dsQuota)
throws IOException {
if (fsd.isPermissionEnabled()) {
FSPermissionChecker pc = fsd.getPermissionChecker();
pc.checkSuperuserPrivilege();
}
fsd.writeLock();
try {
INodeDirectory changed = unprotectedSetQuota(fsd, src, nsQuota, dsQuota);
if (changed != null) {
final Quota.Counts q = changed.getQuotaCounts();
fsd.getEditLog().logSetQuota(
src, q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
}
} finally {
fsd.writeUnlock();
}
}
static void unprotectedSetPermission(
FSDirectory fsd, String src, FsPermission permissions)
throws FileNotFoundException, UnresolvedLinkException,
QuotaExceededException, SnapshotAccessControlException {
assert fsd.hasWriteLock();
final INodesInPath inodesInPath = fsd.getINodesInPath4Write(src, true);
final INode inode = inodesInPath.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File does not exist: " + src);
}
int snapshotId = inodesInPath.getLatestSnapshotId();
inode.setPermission(permissions, snapshotId);
}
static void unprotectedSetOwner(
FSDirectory fsd, String src, String username, String groupname)
throws FileNotFoundException, UnresolvedLinkException,
QuotaExceededException, SnapshotAccessControlException {
assert fsd.hasWriteLock();
final INodesInPath inodesInPath = fsd.getINodesInPath4Write(src, true);
INode inode = inodesInPath.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File does not exist: " + src);
}
if (username != null) {
inode = inode.setUser(username, inodesInPath.getLatestSnapshotId());
}
if (groupname != null) {
inode.setGroup(groupname, inodesInPath.getLatestSnapshotId());
}
}
static boolean setTimes(
FSDirectory fsd, INode inode, long mtime, long atime, boolean force,
int latestSnapshotId) throws QuotaExceededException {
fsd.writeLock();
try {
return unprotectedSetTimes(fsd, inode, mtime, atime, force,
latestSnapshotId);
} finally {
fsd.writeUnlock();
}
}
static boolean unprotectedSetTimes(
FSDirectory fsd, String src, long mtime, long atime, boolean force)
throws UnresolvedLinkException, QuotaExceededException {
assert fsd.hasWriteLock();
final INodesInPath i = fsd.getINodesInPath(src, true);
return unprotectedSetTimes(fsd, i.getLastINode(), mtime, atime,
force, i.getLatestSnapshotId());
}
/**
* See {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long)}
* for the contract.
* Sets quota for for a directory.
* @return INodeDirectory if any of the quotas have changed. null otherwise.
* @throws FileNotFoundException if the path does not exist.
* @throws PathIsNotDirectoryException if the path is not a directory.
* @throws QuotaExceededException if the directory tree size is
* greater than the given quota
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
static INodeDirectory unprotectedSetQuota(
FSDirectory fsd, String src, long nsQuota, long dsQuota)
throws FileNotFoundException, PathIsNotDirectoryException,
QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
assert fsd.hasWriteLock();
// sanity check
if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET &&
nsQuota != HdfsConstants.QUOTA_RESET) ||
(dsQuota < 0 && dsQuota != HdfsConstants.QUOTA_DONT_SET &&
dsQuota != HdfsConstants.QUOTA_RESET)) {
throw new IllegalArgumentException("Illegal value for nsQuota or " +
"dsQuota : " + nsQuota + " and " +
dsQuota);
}
String srcs = FSDirectory.normalizePath(src);
final INodesInPath iip = fsd.getINodesInPath4Write(srcs, true);
INodeDirectory dirNode = INodeDirectory.valueOf(iip.getLastINode(), srcs);
if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
throw new IllegalArgumentException("Cannot clear namespace quota on root.");
} else { // a directory inode
final Quota.Counts oldQuota = dirNode.getQuotaCounts();
final long oldNsQuota = oldQuota.get(Quota.NAMESPACE);
final long oldDsQuota = oldQuota.get(Quota.DISKSPACE);
if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
nsQuota = oldNsQuota;
}
if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
dsQuota = oldDsQuota;
}
if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
return null;
}
final int latest = iip.getLatestSnapshotId();
dirNode.recordModification(latest);
dirNode.setQuota(nsQuota, dsQuota);
return dirNode;
}
}
static Block[] unprotectedSetReplication(
FSDirectory fsd, String src, short replication, short[] blockRepls)
throws QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
assert fsd.hasWriteLock();
final INodesInPath iip = fsd.getINodesInPath4Write(src, true);
final INode inode = iip.getLastINode();
if (inode == null || !inode.isFile()) {
return null;
}
INodeFile file = inode.asFile();
final short oldBR = file.getBlockReplication();
// before setFileReplication, check for increasing block replication.
// if replication > oldBR, then newBR == replication.
// if replication < oldBR, we don't know newBR yet.
if (replication > oldBR) {
long dsDelta = (replication - oldBR)*(file.diskspaceConsumed()/oldBR);
fsd.updateCount(iip, 0, dsDelta, true);
}
file.setFileReplication(replication, iip.getLatestSnapshotId());
final short newBR = file.getBlockReplication();
// check newBR < oldBR case.
if (newBR < oldBR) {
long dsDelta = (newBR - oldBR)*(file.diskspaceConsumed()/newBR);
fsd.updateCount(iip, 0, dsDelta, true);
}
if (blockRepls != null) {
blockRepls[0] = oldBR;
blockRepls[1] = newBR;
}
return file.getBlocks();
}
static void unprotectedSetStoragePolicy(
FSDirectory fsd, BlockManager bm, INodesInPath iip, byte policyId)
throws IOException {
assert fsd.hasWriteLock();
final INode inode = iip.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File/Directory does not exist: "
+ iip.getPath());
}
final int snapshotId = iip.getLatestSnapshotId();
if (inode.isFile()) {
BlockStoragePolicy newPolicy = bm.getStoragePolicy(policyId);
if (newPolicy.isCopyOnCreateFile()) {
throw new HadoopIllegalArgumentException(
"Policy " + newPolicy + " cannot be set after file creation.");
}
BlockStoragePolicy currentPolicy =
bm.getStoragePolicy(inode.getLocalStoragePolicyID());
if (currentPolicy != null && currentPolicy.isCopyOnCreateFile()) {
throw new HadoopIllegalArgumentException(
"Existing policy " + currentPolicy.getName() +
" cannot be changed after file creation.");
}
inode.asFile().setStoragePolicyID(policyId, snapshotId);
} else if (inode.isDirectory()) {
setDirStoragePolicy(fsd, inode.asDirectory(), policyId, snapshotId);
} else {
throw new FileNotFoundException(iip.getPath()
+ " is not a file or directory");
}
}
private static void setDirStoragePolicy(
FSDirectory fsd, INodeDirectory inode, byte policyId,
int latestSnapshotId) throws IOException {
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId);
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
Arrays.asList(xAttr),
EnumSet.of(
XAttrSetFlag.CREATE,
XAttrSetFlag.REPLACE));
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
}
private static boolean unprotectedSetTimes(
FSDirectory fsd, INode inode, long mtime, long atime, boolean force,
int latest) throws QuotaExceededException {
assert fsd.hasWriteLock();
boolean status = false;
if (mtime != -1) {
inode = inode.setModificationTime(mtime, latest);
status = true;
}
if (atime != -1) {
long inodeTime = inode.getAccessTime();
// if the last access time update was within the last precision interval, then
// no need to store access time
if (atime <= inodeTime + fsd.getFSNamesystem().getAccessTimePrecision()
&& !force) {
status = false;
} else {
inode.setAccessTime(atime, latest);
status = true;
}
}
return status;
}
}

View File

@ -139,6 +139,11 @@ class FSDirStatAndListingOp {
return getContentSummaryInt(fsd, iip);
}
private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
parentPolicy;
}
/**
* Get a partial listing of the indicated directory
*
@ -196,7 +201,7 @@ class FSDirStatAndListingOp {
cur.getLocalStoragePolicyID():
BlockStoragePolicySuite.ID_UNSPECIFIED;
listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
needLocation, fsd.getStoragePolicyID(curPolicy,
needLocation, getStoragePolicyID(curPolicy,
parentStoragePolicy), snapshot, isRawPath, iip);
listingCnt++;
if (needLocation) {

View File

@ -17,20 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import static org.apache.hadoop.util.Time.now;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@ -41,7 +30,6 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
@ -54,8 +42,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
@ -68,22 +54,35 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.ChunkedArrayList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import static org.apache.hadoop.util.Time.now;
/**
* Both FSDirectory and FSNamesystem manage the state of the namespace.
* FSDirectory is a pure in-memory data structure, all of whose operations
@ -145,6 +144,12 @@ public class FSDirectory implements Closeable {
private final boolean aclsEnabled;
private final boolean xattrsEnabled;
private final int xattrMaxSize;
// precision of access times.
private final long accessTimePrecision;
// whether setStoragePolicy is allowed.
private final boolean storagePolicyEnabled;
private final String fsOwnerShortUserName;
private final String supergroup;
private final INodeId inodeId;
@ -222,6 +227,15 @@ public class FSDirectory implements Closeable {
DFSConfigKeys.DFS_NAMENODE_MAX_XATTR_SIZE_KEY);
final String unlimited = xattrMaxSize == 0 ? " (unlimited)" : "";
LOG.info("Maximum size of an xattr: " + xattrMaxSize + unlimited);
this.accessTimePrecision = conf.getLong(
DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
this.storagePolicyEnabled =
conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
DFS_STORAGE_POLICY_ENABLED_DEFAULT);
int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
@ -287,6 +301,13 @@ public class FSDirectory implements Closeable {
return xattrsEnabled;
}
int getXattrMaxSize() { return xattrMaxSize; }
boolean isStoragePolicyEnabled() {
return storagePolicyEnabled;
}
boolean isAccessTimeSupported() {
return accessTimePrecision > 0;
}
int getLsLimit() {
return lsLimit;
@ -523,172 +544,6 @@ public class FSDirectory implements Closeable {
return resolvePath(path, pathComponents, this);
}
/**
* Set file replication
*
* @param src file name
* @param replication new replication
* @param blockRepls block replications - output parameter
* @return array of file blocks
* @throws QuotaExceededException
* @throws SnapshotAccessControlException
*/
Block[] setReplication(String src, short replication, short[] blockRepls)
throws QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
writeLock();
try {
return unprotectedSetReplication(src, replication, blockRepls);
} finally {
writeUnlock();
}
}
Block[] unprotectedSetReplication(String src, short replication,
short[] blockRepls) throws QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException {
assert hasWriteLock();
final INodesInPath iip = getINodesInPath4Write(src, true);
final INode inode = iip.getLastINode();
if (inode == null || !inode.isFile()) {
return null;
}
INodeFile file = inode.asFile();
final short oldBR = file.getBlockReplication();
// before setFileReplication, check for increasing block replication.
// if replication > oldBR, then newBR == replication.
// if replication < oldBR, we don't know newBR yet.
if (replication > oldBR) {
long dsDelta = (replication - oldBR)*(file.diskspaceConsumed()/oldBR);
updateCount(iip, 0, dsDelta, true);
}
file.setFileReplication(replication, iip.getLatestSnapshotId());
final short newBR = file.getBlockReplication();
// check newBR < oldBR case.
if (newBR < oldBR) {
long dsDelta = (newBR - oldBR)*(file.diskspaceConsumed()/newBR);
updateCount(iip, 0, dsDelta, true);
}
if (blockRepls != null) {
blockRepls[0] = oldBR;
blockRepls[1] = newBR;
}
return file.getBlocks();
}
/** Set block storage policy for a directory */
void setStoragePolicy(INodesInPath iip, byte policyId)
throws IOException {
writeLock();
try {
unprotectedSetStoragePolicy(iip, policyId);
} finally {
writeUnlock();
}
}
void unprotectedSetStoragePolicy(INodesInPath iip, byte policyId)
throws IOException {
assert hasWriteLock();
final INode inode = iip.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File/Directory does not exist: "
+ iip.getPath());
}
final int snapshotId = iip.getLatestSnapshotId();
if (inode.isFile()) {
BlockStoragePolicy newPolicy = getBlockManager().getStoragePolicy(policyId);
if (newPolicy.isCopyOnCreateFile()) {
throw new HadoopIllegalArgumentException(
"Policy " + newPolicy + " cannot be set after file creation.");
}
BlockStoragePolicy currentPolicy =
getBlockManager().getStoragePolicy(inode.getLocalStoragePolicyID());
if (currentPolicy != null && currentPolicy.isCopyOnCreateFile()) {
throw new HadoopIllegalArgumentException(
"Existing policy " + currentPolicy.getName() +
" cannot be changed after file creation.");
}
inode.asFile().setStoragePolicyID(policyId, snapshotId);
} else if (inode.isDirectory()) {
setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);
} else {
throw new FileNotFoundException(iip.getPath()
+ " is not a file or directory");
}
}
private void setDirStoragePolicy(INodeDirectory inode, byte policyId,
int latestSnapshotId) throws IOException {
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId);
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(this, existingXAttrs,
Arrays.asList(xAttr),
EnumSet.of(
XAttrSetFlag.CREATE,
XAttrSetFlag.REPLACE));
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId);
}
void setPermission(String src, FsPermission permission)
throws FileNotFoundException, UnresolvedLinkException,
QuotaExceededException, SnapshotAccessControlException {
writeLock();
try {
unprotectedSetPermission(src, permission);
} finally {
writeUnlock();
}
}
void unprotectedSetPermission(String src, FsPermission permissions)
throws FileNotFoundException, UnresolvedLinkException,
QuotaExceededException, SnapshotAccessControlException {
assert hasWriteLock();
final INodesInPath inodesInPath = getINodesInPath4Write(src, true);
final INode inode = inodesInPath.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File does not exist: " + src);
}
int snapshotId = inodesInPath.getLatestSnapshotId();
inode.setPermission(permissions, snapshotId);
}
void setOwner(String src, String username, String groupname)
throws FileNotFoundException, UnresolvedLinkException,
QuotaExceededException, SnapshotAccessControlException {
writeLock();
try {
unprotectedSetOwner(src, username, groupname);
} finally {
writeUnlock();
}
}
void unprotectedSetOwner(String src, String username, String groupname)
throws FileNotFoundException, UnresolvedLinkException,
QuotaExceededException, SnapshotAccessControlException {
assert hasWriteLock();
final INodesInPath inodesInPath = getINodesInPath4Write(src, true);
INode inode = inodesInPath.getLastINode();
if (inode == null) {
throw new FileNotFoundException("File does not exist: " + src);
}
if (username != null) {
inode = inode.setUser(username, inodesInPath.getLatestSnapshotId());
}
if (groupname != null) {
inode.setGroup(groupname, inodesInPath.getLatestSnapshotId());
}
}
/**
* Delete the target directory and collect the blocks under it
*
@ -841,11 +696,6 @@ public class FSDirectory implements Closeable {
return removed;
}
byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy :
parentPolicy;
}
/**
* Check whether the filepath could be created
* @throws SnapshotAccessControlException if path is in RO snapshot
@ -895,7 +745,7 @@ public class FSDirectory implements Closeable {
}
}
private void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
boolean checkQuota) throws QuotaExceededException {
updateCount(iip, iip.length() - 1, nsDelta, dsDelta, checkQuota);
}
@ -1315,77 +1165,7 @@ public class FSDirectory implements Closeable {
int getInodeMapSize() {
return inodeMap.size();
}
/**
* See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
* Sets quota for for a directory.
* @return INodeDirectory if any of the quotas have changed. null otherwise.
* @throws FileNotFoundException if the path does not exist.
* @throws PathIsNotDirectoryException if the path is not a directory.
* @throws QuotaExceededException if the directory tree size is
* greater than the given quota
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, PathIsNotDirectoryException,
QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
assert hasWriteLock();
// sanity check
if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET &&
nsQuota != HdfsConstants.QUOTA_RESET) ||
(dsQuota < 0 && dsQuota != HdfsConstants.QUOTA_DONT_SET &&
dsQuota != HdfsConstants.QUOTA_RESET)) {
throw new IllegalArgumentException("Illegal value for nsQuota or " +
"dsQuota : " + nsQuota + " and " +
dsQuota);
}
String srcs = normalizePath(src);
final INodesInPath iip = getINodesInPath4Write(srcs, true);
INodeDirectory dirNode = INodeDirectory.valueOf(iip.getLastINode(), srcs);
if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
throw new IllegalArgumentException("Cannot clear namespace quota on root.");
} else { // a directory inode
final Quota.Counts oldQuota = dirNode.getQuotaCounts();
final long oldNsQuota = oldQuota.get(Quota.NAMESPACE);
final long oldDsQuota = oldQuota.get(Quota.DISKSPACE);
if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
nsQuota = oldNsQuota;
}
if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
dsQuota = oldDsQuota;
}
if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
return null;
}
final int latest = iip.getLatestSnapshotId();
dirNode.recordModification(latest);
dirNode.setQuota(nsQuota, dsQuota);
return dirNode;
}
}
/**
* See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
* @return INodeDirectory if any of the quotas have changed. null otherwise.
* @throws SnapshotAccessControlException if path is in RO snapshot
* @see #unprotectedSetQuota(String, long, long)
*/
INodeDirectory setQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, PathIsNotDirectoryException,
QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
writeLock();
try {
return unprotectedSetQuota(src, nsQuota, dsQuota);
} finally {
writeUnlock();
}
}
long totalInodes() {
readLock();
try {
@ -1396,50 +1176,6 @@ public class FSDirectory implements Closeable {
}
}
/**
* Sets the access time on the file/directory. Logs it in the transaction log.
*/
boolean setTimes(INode inode, long mtime, long atime, boolean force,
int latestSnapshotId) throws QuotaExceededException {
writeLock();
try {
return unprotectedSetTimes(inode, mtime, atime, force, latestSnapshotId);
} finally {
writeUnlock();
}
}
boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force)
throws UnresolvedLinkException, QuotaExceededException {
assert hasWriteLock();
final INodesInPath i = getINodesInPath(src, true);
return unprotectedSetTimes(i.getLastINode(), mtime, atime, force,
i.getLatestSnapshotId());
}
private boolean unprotectedSetTimes(INode inode, long mtime,
long atime, boolean force, int latest) throws QuotaExceededException {
assert hasWriteLock();
boolean status = false;
if (mtime != -1) {
inode = inode.setModificationTime(mtime, latest);
status = true;
}
if (atime != -1) {
long inodeTime = inode.getAccessTime();
// if the last access time update was within the last precision interval, then
// no need to store access time
if (atime <= inodeTime + getFSNamesystem().getAccessTimePrecision() && !force) {
status = false;
} else {
inode.setAccessTime(atime, latest);
status = true;
}
}
return status;
}
/**
* Reset the entire namespace tree.
*/

View File

@ -482,9 +482,8 @@ public class FSEditLogLoader {
SetReplicationOp setReplicationOp = (SetReplicationOp)op;
short replication = fsNamesys.getBlockManager().adjustReplication(
setReplicationOp.replication);
fsDir.unprotectedSetReplication(
renameReservedPathsOnUpgrade(setReplicationOp.path, logVersion),
replication, null);
FSDirAttrOp.unprotectedSetReplication(fsDir, renameReservedPathsOnUpgrade(
setReplicationOp.path, logVersion), replication, null);
break;
}
case OP_CONCAT_DELETE: {
@ -542,45 +541,42 @@ public class FSEditLogLoader {
}
case OP_SET_PERMISSIONS: {
SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
fsDir.unprotectedSetPermission(
renameReservedPathsOnUpgrade(setPermissionsOp.src, logVersion),
setPermissionsOp.permissions);
FSDirAttrOp.unprotectedSetPermission(fsDir, renameReservedPathsOnUpgrade(
setPermissionsOp.src, logVersion), setPermissionsOp.permissions);
break;
}
case OP_SET_OWNER: {
SetOwnerOp setOwnerOp = (SetOwnerOp)op;
fsDir.unprotectedSetOwner(
renameReservedPathsOnUpgrade(setOwnerOp.src, logVersion),
FSDirAttrOp.unprotectedSetOwner(
fsDir, renameReservedPathsOnUpgrade(setOwnerOp.src, logVersion),
setOwnerOp.username, setOwnerOp.groupname);
break;
}
case OP_SET_NS_QUOTA: {
SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
fsDir.unprotectedSetQuota(
renameReservedPathsOnUpgrade(setNSQuotaOp.src, logVersion),
FSDirAttrOp.unprotectedSetQuota(
fsDir, renameReservedPathsOnUpgrade(setNSQuotaOp.src, logVersion),
setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET);
break;
}
case OP_CLEAR_NS_QUOTA: {
ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
fsDir.unprotectedSetQuota(
renameReservedPathsOnUpgrade(clearNSQuotaOp.src, logVersion),
FSDirAttrOp.unprotectedSetQuota(
fsDir, renameReservedPathsOnUpgrade(clearNSQuotaOp.src, logVersion),
HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET);
break;
}
case OP_SET_QUOTA:
SetQuotaOp setQuotaOp = (SetQuotaOp)op;
fsDir.unprotectedSetQuota(
renameReservedPathsOnUpgrade(setQuotaOp.src, logVersion),
setQuotaOp.nsQuota, setQuotaOp.dsQuota);
FSDirAttrOp.unprotectedSetQuota(fsDir, renameReservedPathsOnUpgrade(
setQuotaOp.src, logVersion), setQuotaOp.nsQuota, setQuotaOp.dsQuota);
break;
case OP_TIMES: {
TimesOp timesOp = (TimesOp)op;
fsDir.unprotectedSetTimes(
renameReservedPathsOnUpgrade(timesOp.path, logVersion),
FSDirAttrOp.unprotectedSetTimes(
fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
timesOp.mtime, timesOp.atime, true);
break;
}
@ -856,7 +852,9 @@ public class FSEditLogLoader {
final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
logVersion);
final INodesInPath iip = fsDir.getINodesInPath4Write(path);
fsDir.unprotectedSetStoragePolicy(iip, setStoragePolicyOp.policyId);
FSDirAttrOp.unprotectedSetStoragePolicy(
fsDir, fsNamesys.getBlockManager(), iip,
setStoragePolicyOp.policyId);
break;
}
default:

View File

@ -86,8 +86,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROU
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.util.Time.now;
@ -428,9 +426,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final CacheManager cacheManager;
private final DatanodeStatistics datanodeStatistics;
// whether setStoragePolicy is allowed.
private final boolean isStoragePolicyEnabled;
private String nameserviceId;
private RollingUpgradeInfo rollingUpgradeInfo = null;
@ -753,10 +748,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdManager = new BlockIdManager(blockManager);
this.isStoragePolicyEnabled =
conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
DFS_STORAGE_POLICY_ENABLED_DEFAULT);
this.fsOwner = UserGroupInformation.getCurrentUser();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
@ -1674,36 +1665,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws IOException
*/
void setPermission(String src, FsPermission permission) throws IOException {
try {
setPermissionInt(src, permission);
} catch (AccessControlException e) {
logAuditEvent(false, "setPermission", src);
throw e;
}
}
private void setPermissionInt(final String srcArg, FsPermission permission)
throws IOException {
String src = srcArg;
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
HdfsFileStatus auditStat;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set permission for " + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
dir.checkOwner(pc, iip);
dir.setPermission(src, permission);
getEditLog().logSetPermissions(src, permission);
resultingStat = getAuditFileInfo(src, false);
auditStat = FSDirAttrOp.setPermission(dir, src, permission);
} catch (AccessControlException e) {
logAuditEvent(false, "setPermission", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
logAuditEvent(true, "setPermission", srcArg, null, resultingStat);
logAuditEvent(true, "setPermission", src, null, auditStat);
}
/**
@ -1712,44 +1688,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
void setOwner(String src, String username, String group)
throws IOException {
try {
setOwnerInt(src, username, group);
} catch (AccessControlException e) {
logAuditEvent(false, "setOwner", src);
throw e;
}
}
private void setOwnerInt(final String srcArg, String username, String group)
throws IOException {
String src = srcArg;
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
HdfsFileStatus auditStat;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set owner for " + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
dir.checkOwner(pc, iip);
if (!pc.isSuperUser()) {
if (username != null && !pc.getUser().equals(username)) {
throw new AccessControlException("Non-super user cannot change owner");
}
if (group != null && !pc.containsGroup(group)) {
throw new AccessControlException("User does not belong to " + group);
}
}
dir.setOwner(src, username, group);
getEditLog().logSetOwner(src, username, group);
resultingStat = getAuditFileInfo(src, false);
auditStat = FSDirAttrOp.setOwner(dir, src, username, group);
} catch (AccessControlException e) {
logAuditEvent(false, "setOwner", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
logAuditEvent(true, "setOwner", srcArg, null, resultingStat);
logAuditEvent(true, "setOwner", src, null, auditStat);
}
static class GetBlockLocationsResult {
@ -1794,7 +1747,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean updateAccessTime = now > inode.getAccessTime() +
getAccessTimePrecision();
if (!isInSafeMode() && updateAccessTime) {
boolean changed = dir.setTimes(
boolean changed = FSDirAttrOp.setTimes(dir,
inode, -1, now, false, res.iip.getLatestSnapshotId());
if (changed) {
getEditLog().logTimes(src, -1, now);
@ -1941,52 +1894,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* The access time is precise up to an hour. The transaction, if needed, is
* written to the edits log but is not flushed.
*/
void setTimes(String src, long mtime, long atime)
throws IOException, UnresolvedLinkException {
if (!isAccessTimeSupported() && atime != -1) {
throw new IOException("Access time for hdfs is not configured. " +
" Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
}
try {
setTimesInt(src, mtime, atime);
} catch (AccessControlException e) {
logAuditEvent(false, "setTimes", src);
throw e;
}
}
private void setTimesInt(final String srcArg, long mtime, long atime)
throws IOException {
String src = srcArg;
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
void setTimes(String src, long mtime, long atime) throws IOException {
HdfsFileStatus auditStat;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set times " + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
// Write access is required to set access and modification times
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.WRITE);
}
final INode inode = iip.getLastINode();
if (inode != null) {
boolean changed = dir.setTimes(inode, mtime, atime, true,
iip.getLatestSnapshotId());
if (changed) {
getEditLog().logTimes(src, mtime, atime);
}
resultingStat = getAuditFileInfo(src, false);
} else {
throw new FileNotFoundException("File/Directory " + src + " does not exist.");
}
auditStat = FSDirAttrOp.setTimes(dir, src, mtime, atime);
} catch (AccessControlException e) {
logAuditEvent(false, "setTimes", src);
throw e;
} finally {
writeUnlock();
}
logAuditEvent(true, "setTimes", srcArg, null, resultingStat);
getEditLog().logSync();
logAuditEvent(true, "setTimes", src, null, auditStat);
}
/**
@ -2066,49 +1989,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
boolean setReplication(final String src, final short replication)
throws IOException {
try {
return setReplicationInt(src, replication);
} catch (AccessControlException e) {
logAuditEvent(false, "setReplication", src);
throw e;
}
}
private boolean setReplicationInt(final String srcArg,
final short replication) throws IOException {
String src = srcArg;
blockManager.verifyReplication(src, replication, null);
final boolean isFile;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean success = false;
waitForLoadingFSImage();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set replication for " + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.WRITE);
}
final short[] blockRepls = new short[2]; // 0: old, 1: new
final Block[] blocks = dir.setReplication(src, replication, blockRepls);
isFile = blocks != null;
if (isFile) {
getEditLog().logSetReplication(src, replication);
blockManager.setReplication(blockRepls[0], blockRepls[1], src, blocks);
}
success = FSDirAttrOp.setReplication(dir, blockManager, src, replication);
} catch (AccessControlException e) {
logAuditEvent(false, "setReplication", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
if (isFile) {
logAuditEvent(true, "setReplication", srcArg);
if (success) {
getEditLog().logSync();
logAuditEvent(true, "setReplication", src);
}
return isFile;
return success;
}
/**
@ -2117,58 +2016,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param src file/directory path
* @param policyName storage policy name
*/
void setStoragePolicy(String src, final String policyName)
throws IOException {
try {
setStoragePolicyInt(src, policyName);
} catch (AccessControlException e) {
logAuditEvent(false, "setStoragePolicy", src);
throw e;
}
}
private void setStoragePolicyInt(String src, final String policyName)
throws IOException {
if (!isStoragePolicyEnabled) {
throw new IOException("Failed to set storage policy since "
+ DFS_STORAGE_POLICY_ENABLED_KEY + " is set to false.");
}
FSPermissionChecker pc = null;
if (isPermissionEnabled) {
pc = getPermissionChecker();
}
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
void setStoragePolicy(String src, String policyName) throws IOException {
HdfsFileStatus auditStat;
waitForLoadingFSImage();
HdfsFileStatus fileStat;
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set storage policy for " + src);
src = FSDirectory.resolvePath(src, pathComponents, dir);
final INodesInPath iip = dir.getINodesInPath4Write(src);
if (pc != null) {
dir.checkPermission(pc, iip, false, null, null, FsAction.WRITE, null, false);
}
// get the corresponding policy and make sure the policy name is valid
BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName);
if (policy == null) {
throw new HadoopIllegalArgumentException(
"Cannot find a block policy with the name " + policyName);
}
dir.setStoragePolicy(iip, policy.getId());
getEditLog().logSetStoragePolicy(src, policy.getId());
fileStat = getAuditFileInfo(src, false);
auditStat = FSDirAttrOp.setStoragePolicy(
dir, blockManager, src, policyName);
} catch (AccessControlException e) {
logAuditEvent(false, "setStoragePolicy", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
logAuditEvent(true, "setStoragePolicy", src, null, fileStat);
logAuditEvent(true, "setStoragePolicy", src, null, auditStat);
}
/**
@ -2180,26 +2045,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
readLock();
try {
checkOperation(OperationCategory.READ);
return blockManager.getStoragePolicies();
return FSDirAttrOp.getStoragePolicies(blockManager);
} finally {
readUnlock();
}
}
long getPreferredBlockSize(String filename) throws IOException {
FSPermissionChecker pc = getPermissionChecker();
long getPreferredBlockSize(String src) throws IOException {
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(filename);
readLock();
try {
checkOperation(OperationCategory.READ);
filename = dir.resolvePath(pc, filename, pathComponents);
final INodesInPath iip = dir.getINodesInPath(filename, false);
if (isPermissionEnabled) {
dir.checkTraverse(pc, iip);
}
return INodeFile.valueOf(iip.getLastINode(), filename)
.getPreferredBlockSize();
return FSDirAttrOp.getPreferredBlockSize(dir, src);
} finally {
readUnlock();
}
@ -3857,20 +3714,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*
* Note: This does not support ".inodes" relative path.
*/
void setQuota(String path, long nsQuota, long dsQuota)
throws IOException, UnresolvedLinkException {
checkSuperuserPrivilege();
void setQuota(String src, long nsQuota, long dsQuota)
throws IOException {
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set quota on " + path);
INodeDirectory changed = dir.setQuota(path, nsQuota, dsQuota);
if (changed != null) {
final Quota.Counts q = changed.getQuotaCounts();
getEditLog().logSetQuota(path,
q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
}
checkNameNodeSafeMode("Cannot set quota on " + src);
FSDirAttrOp.setQuota(dir, src, nsQuota, dsQuota);
} finally {
writeUnlock();
}