HDFS-7436. Consolidate implementation of concat(). Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-11-24 15:42:38 -08:00
parent e37a4ff0c1
commit 8caf537afa
5 changed files with 268 additions and 242 deletions

View File

@ -393,6 +393,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7419. Improve error messages for DataNode hot swap drive feature (Lei
Xu via Colin P. Mccabe)
HDFS-7436. Consolidate implementation of concat(). (wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.apache.hadoop.util.Time.now;
class FSDirConcatOp {
static HdfsFileStatus concat(
FSDirectory fsd, String target, String[] srcs,
boolean logRetryCache) throws IOException {
Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
Preconditions.checkArgument(srcs != null && srcs.length > 0,
"No sources given");
assert srcs != null;
FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
// We require all files be in the same directory
String trgParent =
target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
for (String s : srcs) {
String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
if (!srcParent.equals(trgParent)) {
throw new IllegalArgumentException(
"Sources and target are not in the same directory");
}
}
// write permission for the target
if (fsd.isPermissionEnabled()) {
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.checkPathAccess(pc, target, FsAction.WRITE);
// and srcs
for(String aSrc: srcs) {
fsd.checkPathAccess(pc, aSrc, FsAction.READ); // read the file
fsd.checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
}
}
// to make sure no two files are the same
Set<INode> si = new HashSet<INode>();
// we put the following prerequisite for the operation
// replication and blocks sizes should be the same for ALL the blocks
// check the target
final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
if (fsd.getEZForPath(trgIip) != null) {
throw new HadoopIllegalArgumentException(
"concat can not be called for files in an encryption zone.");
}
final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target);
if(trgInode.isUnderConstruction()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is under construction");
}
// per design target shouldn't be empty and all the blocks same size
if(trgInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is empty");
}
if (trgInode.isWithSnapshot()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is in a snapshot");
}
long blockSize = trgInode.getPreferredBlockSize();
// check the end block to be full
final BlockInfo last = trgInode.getLastBlock();
if(blockSize != last.getNumBytes()) {
throw new HadoopIllegalArgumentException("The last block in " + target
+ " is not full; last block size = " + last.getNumBytes()
+ " but file block size = " + blockSize);
}
si.add(trgInode);
final short repl = trgInode.getFileReplication();
// now check the srcs
boolean endSrc = false; // final src file doesn't have to have full end block
for(int i=0; i< srcs.length; i++) {
String src = srcs[i];
if(i== srcs.length-1)
endSrc=true;
final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src);
if(src.isEmpty()
|| srcInode.isUnderConstruction()
|| srcInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: source file " + src
+ " is invalid or empty or underConstruction");
}
// check replication and blocks size
if(repl != srcInode.getBlockReplication()) {
throw new HadoopIllegalArgumentException("concat: the source file "
+ src + " and the target file " + target
+ " should have the same replication: source replication is "
+ srcInode.getBlockReplication()
+ " but target replication is " + repl);
}
//boolean endBlock=false;
// verify that all the blocks are of the same length as target
// should be enough to check the end blocks
final BlockInfo[] srcBlocks = srcInode.getBlocks();
int idx = srcBlocks.length-1;
if(endSrc)
idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
throw new HadoopIllegalArgumentException("concat: the source file "
+ src + " and the target file " + target
+ " should have the same blocks sizes: target block size is "
+ blockSize + " but the size of source block " + idx + " is "
+ srcBlocks[idx].getNumBytes());
}
si.add(srcInode);
}
// make sure no two files are the same
if(si.size() < srcs.length+1) { // trg + srcs
// it means at least two files are the same
throw new HadoopIllegalArgumentException(
"concat: at least two of the source files are the same");
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
Arrays.toString(srcs) + " to " + target);
}
long timestamp = now();
fsd.writeLock();
try {
unprotectedConcat(fsd, target, srcs, timestamp);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
return fsd.getAuditFileInfo(target, false);
}
/**
* Concat all the blocks from srcs to trg and delete the srcs files
* @param fsd FSDirectory
* @param target target file to move the blocks to
* @param srcs list of file to move the blocks from
*/
static void unprotectedConcat(
FSDirectory fsd, String target, String[] srcs, long timestamp)
throws IOException {
assert fsd.hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
}
// do the move
final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
final INode[] trgINodes = trgIIP.getINodes();
final INodeFile trgInode = trgIIP.getLastINode().asFile();
INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
for(int i = 0; i < srcs.length; i++) {
final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]);
final int latest = iip.getLatestSnapshotId();
final INode inode = iip.getLastINode();
// check if the file in the latest snapshot
if (inode.isInLatestSnapshot(latest)) {
throw new SnapshotException("Concat: the source file " + srcs[i]
+ " is in snapshot " + latest);
}
// check if the file has other references.
if (inode.isReference() && ((INodeReference.WithCount)
inode.asReference().getReferredINode()).getReferenceCount() > 1) {
throw new SnapshotException("Concat: the source file " + srcs[i]
+ " is referred by some other reference in some snapshot.");
}
allSrcInodes[i] = inode.asFile();
}
trgInode.concatBlocks(allSrcInodes);
// since we are in the same dir - we can use same parent to remove files
int count = 0;
for(INodeFile nodeToRemove: allSrcInodes) {
if(nodeToRemove == null) continue;
nodeToRemove.setBlocks(null);
trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
fsd.getINodeMap().remove(nodeToRemove);
count++;
}
trgInode.setModificationTime(timestamp, trgLatestSnapshot);
trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
// update quota on the parent directory ('count' files removed, 0 space)
FSDirectory.unprotectedUpdateCount(trgIIP, trgINodes.length - 1, -count, 0);
}
}

View File

@ -98,6 +98,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Both FSDirectory and FSNamesystem manage the state of the namespace.
@ -108,6 +110,7 @@ import org.apache.hadoop.security.UserGroupInformation;
**/
@InterfaceAudience.Private
public class FSDirectory implements Closeable {
static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class);
private static INodeDirectory createRoot(FSNamesystem namesystem) {
final INodeDirectory r = new INodeDirectory(
INodeId.ROOT_INODE_ID,
@ -157,6 +160,8 @@ public class FSDirectory implements Closeable {
private final String fsOwnerShortUserName;
private final String supergroup;
private final FSEditLog editLog;
// utility methods to acquire and release read lock and write lock
void readLock() {
this.dirLock.readLock().lock();
@ -250,7 +255,7 @@ public class FSDirectory implements Closeable {
+ " times");
nameCache = new NameCache<ByteArray>(threshold);
namesystem = ns;
this.editLog = ns.getEditLog();
ezManager = new EncryptionZoneManager(this, conf);
}
@ -267,6 +272,14 @@ public class FSDirectory implements Closeable {
return rootDir;
}
boolean isPermissionEnabled() {
return isPermissionEnabled;
}
FSEditLog getEditLog() {
return editLog;
}
/**
* Shutdown the filestore
*/
@ -1173,81 +1186,6 @@ public class FSDirectory implements Closeable {
}
}
/**
* Concat all the blocks from srcs to trg and delete the srcs files
*/
void concat(String target, String[] srcs, long timestamp)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, SnapshotException {
writeLock();
try {
// actual move
unprotectedConcat(target, srcs, timestamp);
} finally {
writeUnlock();
}
}
/**
* Concat all the blocks from srcs to trg and delete the srcs files
* @param target target file to move the blocks to
* @param srcs list of file to move the blocks from
*/
void unprotectedConcat(String target, String [] srcs, long timestamp)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, SnapshotException {
assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
}
// do the move
final INodesInPath trgIIP = getINodesInPath4Write(target, true);
final INode[] trgINodes = trgIIP.getINodes();
final INodeFile trgInode = trgIIP.getLastINode().asFile();
INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
for(int i = 0; i < srcs.length; i++) {
final INodesInPath iip = getINodesInPath4Write(srcs[i]);
final int latest = iip.getLatestSnapshotId();
final INode inode = iip.getLastINode();
// check if the file in the latest snapshot
if (inode.isInLatestSnapshot(latest)) {
throw new SnapshotException("Concat: the source file " + srcs[i]
+ " is in snapshot " + latest);
}
// check if the file has other references.
if (inode.isReference() && ((INodeReference.WithCount)
inode.asReference().getReferredINode()).getReferenceCount() > 1) {
throw new SnapshotException("Concat: the source file " + srcs[i]
+ " is referred by some other reference in some snapshot.");
}
allSrcInodes[i] = inode.asFile();
}
trgInode.concatBlocks(allSrcInodes);
// since we are in the same dir - we can use same parent to remove files
int count = 0;
for(INodeFile nodeToRemove: allSrcInodes) {
if(nodeToRemove == null) continue;
nodeToRemove.setBlocks(null);
trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
inodeMap.remove(nodeToRemove);
count++;
}
trgInode.setModificationTime(timestamp, trgLatestSnapshot);
trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
// update quota on the parent directory ('count' files removed, 0 space)
unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0);
}
/**
* Delete the target directory and collect the blocks under it
*
@ -1790,8 +1728,7 @@ public class FSDirectory implements Closeable {
* updates quota without verification
* callers responsibility is to make sure quota is not exceeded
*/
private static void unprotectedUpdateCount(INodesInPath inodesInPath,
int numOfINodes, long nsDelta, long dsDelta) {
static void unprotectedUpdateCount(INodesInPath inodesInPath, int numOfINodes, long nsDelta, long dsDelta) {
final INode[] inodes = inodesInPath.getINodes();
for(int i=0; i < numOfINodes; i++) {
if (inodes[i].isQuotaSet()) { // a directory with quota
@ -3413,4 +3350,10 @@ public class FSDirectory implements Closeable {
}
}
}
HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
throws IOException {
return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
? getFileInfo(path, resolveSymlink, false, false) : null;
}
}

View File

@ -490,7 +490,7 @@ public class FSEditLogLoader {
srcs[i] =
renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
}
fsDir.unprotectedConcat(trg, srcs, concatDeleteOp.timestamp);
FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,

View File

@ -258,8 +258,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.annotation.Metric;
@ -342,8 +340,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
throws IOException {
return (isAuditEnabled() && isExternalInvocation())
? dir.getFileInfo(path, resolveSymlink, false, false) : null;
return dir.getAuditFileInfo(path, resolveSymlink);
}
private void logAuditEvent(boolean succeeded, String cmd, String src)
@ -1944,175 +1941,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws IOException on error
*/
void concat(String target, String [] srcs, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
try {
concatInt(target, srcs, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
throw e;
}
}
private void concatInt(String target, String [] srcs,
boolean logRetryCache) throws IOException, UnresolvedLinkException {
// verify args
if(target.isEmpty()) {
throw new IllegalArgumentException("Target file name is empty");
}
if(srcs == null || srcs.length == 0) {
throw new IllegalArgumentException("No sources given");
}
// We require all files be in the same directory
String trgParent =
target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
for (String s : srcs) {
String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
if (!srcParent.equals(trgParent)) {
throw new IllegalArgumentException(
"Sources and target are not in the same directory");
}
}
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
throws IOException {
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
HdfsFileStatus stat = null;
boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot concat " + target);
concatInternal(pc, target, srcs, logRetryCache);
resultingStat = getAuditFileInfo(target, false);
stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
success = true;
} finally {
writeUnlock();
if (success) {
getEditLog().logSync();
}
logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
}
getEditLog().logSync();
logAuditEvent(true, "concat", Arrays.toString(srcs), target, resultingStat);
}
/** See {@link #concat(String, String[])} */
private void concatInternal(FSPermissionChecker pc, String target,
String[] srcs, boolean logRetryCache) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
// write permission for the target
if (isPermissionEnabled) {
checkPathAccess(pc, target, FsAction.WRITE);
// and srcs
for(String aSrc: srcs) {
checkPathAccess(pc, aSrc, FsAction.READ); // read the file
checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
}
}
// to make sure no two files are the same
Set<INode> si = new HashSet<INode>();
// we put the following prerequisite for the operation
// replication and blocks sizes should be the same for ALL the blocks
// check the target
final INodesInPath trgIip = dir.getINodesInPath4Write(target);
if (dir.getEZForPath(trgIip) != null) {
throw new HadoopIllegalArgumentException(
"concat can not be called for files in an encryption zone.");
}
final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(),
target);
if(trgInode.isUnderConstruction()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is under construction");
}
// per design target shouldn't be empty and all the blocks same size
if(trgInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is empty");
}
if (trgInode.isWithSnapshot()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is in a snapshot");
}
long blockSize = trgInode.getPreferredBlockSize();
// check the end block to be full
final BlockInfo last = trgInode.getLastBlock();
if(blockSize != last.getNumBytes()) {
throw new HadoopIllegalArgumentException("The last block in " + target
+ " is not full; last block size = " + last.getNumBytes()
+ " but file block size = " + blockSize);
}
si.add(trgInode);
final short repl = trgInode.getFileReplication();
// now check the srcs
boolean endSrc = false; // final src file doesn't have to have full end block
for(int i=0; i<srcs.length; i++) {
String src = srcs[i];
if(i==srcs.length-1)
endSrc=true;
final INodeFile srcInode = INodeFile.valueOf(dir.getINode4Write(src), src);
if(src.isEmpty()
|| srcInode.isUnderConstruction()
|| srcInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: source file " + src
+ " is invalid or empty or underConstruction");
}
// check replication and blocks size
if(repl != srcInode.getBlockReplication()) {
throw new HadoopIllegalArgumentException("concat: the source file "
+ src + " and the target file " + target
+ " should have the same replication: source replication is "
+ srcInode.getBlockReplication()
+ " but target replication is " + repl);
}
//boolean endBlock=false;
// verify that all the blocks are of the same length as target
// should be enough to check the end blocks
final BlockInfo[] srcBlocks = srcInode.getBlocks();
int idx = srcBlocks.length-1;
if(endSrc)
idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
throw new HadoopIllegalArgumentException("concat: the source file "
+ src + " and the target file " + target
+ " should have the same blocks sizes: target block size is "
+ blockSize + " but the size of source block " + idx + " is "
+ srcBlocks[idx].getNumBytes());
}
si.add(srcInode);
}
// make sure no two files are the same
if(si.size() < srcs.length+1) { // trg + srcs
// it means at least two files are the same
throw new HadoopIllegalArgumentException(
"concat: at least two of the source files are the same");
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
Arrays.toString(srcs) + " to " + target);
}
long timestamp = now();
dir.concat(target, srcs, timestamp);
getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
}
/**
* stores the modification and access time for this inode.
* The access time is precise up to an hour. The transaction, if needed, is
@ -7240,7 +7088,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* Client invoked methods are invoked over RPC and will be in
* RPC call context even if the client exits.
*/
private boolean isExternalInvocation() {
boolean isExternalInvocation() {
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
}