diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2ba21fc2e9c..e2116b0ee36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -393,6 +393,8 @@ Release 2.7.0 - UNRELEASED HDFS-7419. Improve error messages for DataNode hot swap drive feature (Lei Xu via Colin P. Mccabe) + HDFS-7436. Consolidate implementation of concat(). (wheat9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java new file mode 100644 index 00000000000..12feb33e045 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.hadoop.util.Time.now; + +class FSDirConcatOp { + static HdfsFileStatus concat( + FSDirectory fsd, String target, String[] srcs, + boolean logRetryCache) throws IOException { + Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty"); + Preconditions.checkArgument(srcs != null && srcs.length > 0, + "No sources given"); + assert srcs != null; + + FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target); + // We require all files be in the same directory + String trgParent = + target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR)); + for (String s : srcs) { + String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR)); + if (!srcParent.equals(trgParent)) { + throw new IllegalArgumentException( + "Sources and target are not in the same directory"); + } + } + + // write permission for the target + if (fsd.isPermissionEnabled()) { + FSPermissionChecker pc = fsd.getPermissionChecker(); + fsd.checkPathAccess(pc, target, FsAction.WRITE); + + // and srcs + for(String aSrc: srcs) { + fsd.checkPathAccess(pc, aSrc, FsAction.READ); // read the file + fsd.checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete + } + } + + // to make sure no two files are the same + Set si = new HashSet(); + + // we put the following prerequisite for the operation + // replication and blocks sizes should be the same for ALL the blocks + + // check the target + final INodesInPath trgIip = fsd.getINodesInPath4Write(target); + if (fsd.getEZForPath(trgIip) != null) { + throw new HadoopIllegalArgumentException( + "concat can not be called for files in an encryption zone."); + } + final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target); + if(trgInode.isUnderConstruction()) { + throw new HadoopIllegalArgumentException("concat: target file " + + target + " is under construction"); + } + // per design target shouldn't be empty and all the blocks same size + if(trgInode.numBlocks() == 0) { + throw new HadoopIllegalArgumentException("concat: target file " + + target + " is empty"); + } + if (trgInode.isWithSnapshot()) { + throw new HadoopIllegalArgumentException("concat: target file " + + target + " is in a snapshot"); + } + + long blockSize = trgInode.getPreferredBlockSize(); + + // check the end block to be full + final BlockInfo last = trgInode.getLastBlock(); + if(blockSize != last.getNumBytes()) { + throw new HadoopIllegalArgumentException("The last block in " + target + + " is not full; last block size = " + last.getNumBytes() + + " but file block size = " + blockSize); + } + + si.add(trgInode); + final short repl = trgInode.getFileReplication(); + + // now check the srcs + boolean endSrc = false; // final src file doesn't have to have full end block + for(int i=0; i< srcs.length; i++) { + String src = srcs[i]; + if(i== srcs.length-1) + endSrc=true; + + final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src); + if(src.isEmpty() + || srcInode.isUnderConstruction() + || srcInode.numBlocks() == 0) { + throw new HadoopIllegalArgumentException("concat: source file " + src + + " is invalid or empty or underConstruction"); + } + + // check replication and blocks size + if(repl != srcInode.getBlockReplication()) { + throw new HadoopIllegalArgumentException("concat: the source file " + + src + " and the target file " + target + + " should have the same replication: source replication is " + + srcInode.getBlockReplication() + + " but target replication is " + repl); + } + + //boolean endBlock=false; + // verify that all the blocks are of the same length as target + // should be enough to check the end blocks + final BlockInfo[] srcBlocks = srcInode.getBlocks(); + int idx = srcBlocks.length-1; + if(endSrc) + idx = srcBlocks.length-2; // end block of endSrc is OK not to be full + if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) { + throw new HadoopIllegalArgumentException("concat: the source file " + + src + " and the target file " + target + + " should have the same blocks sizes: target block size is " + + blockSize + " but the size of source block " + idx + " is " + + srcBlocks[idx].getNumBytes()); + } + + si.add(srcInode); + } + + // make sure no two files are the same + if(si.size() < srcs.length+1) { // trg + srcs + // it means at least two files are the same + throw new HadoopIllegalArgumentException( + "concat: at least two of the source files are the same"); + } + + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + + Arrays.toString(srcs) + " to " + target); + } + + long timestamp = now(); + fsd.writeLock(); + try { + unprotectedConcat(fsd, target, srcs, timestamp); + } finally { + fsd.writeUnlock(); + } + fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache); + return fsd.getAuditFileInfo(target, false); + } + + /** + * Concat all the blocks from srcs to trg and delete the srcs files + * @param fsd FSDirectory + * @param target target file to move the blocks to + * @param srcs list of file to move the blocks from + */ + static void unprotectedConcat( + FSDirectory fsd, String target, String[] srcs, long timestamp) + throws IOException { + assert fsd.hasWriteLock(); + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target); + } + // do the move + + final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true); + final INode[] trgINodes = trgIIP.getINodes(); + final INodeFile trgInode = trgIIP.getLastINode().asFile(); + INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory(); + final int trgLatestSnapshot = trgIIP.getLatestSnapshotId(); + + final INodeFile [] allSrcInodes = new INodeFile[srcs.length]; + for(int i = 0; i < srcs.length; i++) { + final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]); + final int latest = iip.getLatestSnapshotId(); + final INode inode = iip.getLastINode(); + + // check if the file in the latest snapshot + if (inode.isInLatestSnapshot(latest)) { + throw new SnapshotException("Concat: the source file " + srcs[i] + + " is in snapshot " + latest); + } + + // check if the file has other references. + if (inode.isReference() && ((INodeReference.WithCount) + inode.asReference().getReferredINode()).getReferenceCount() > 1) { + throw new SnapshotException("Concat: the source file " + srcs[i] + + " is referred by some other reference in some snapshot."); + } + + allSrcInodes[i] = inode.asFile(); + } + trgInode.concatBlocks(allSrcInodes); + + // since we are in the same dir - we can use same parent to remove files + int count = 0; + for(INodeFile nodeToRemove: allSrcInodes) { + if(nodeToRemove == null) continue; + + nodeToRemove.setBlocks(null); + trgParent.removeChild(nodeToRemove, trgLatestSnapshot); + fsd.getINodeMap().remove(nodeToRemove); + count++; + } + + trgInode.setModificationTime(timestamp, trgLatestSnapshot); + trgParent.updateModificationTime(timestamp, trgLatestSnapshot); + // update quota on the parent directory ('count' files removed, 0 space) + FSDirectory.unprotectedUpdateCount(trgIIP, trgINodes.length - 1, -count, 0); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 84e50283418..0a94849cac9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -98,6 +98,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Both FSDirectory and FSNamesystem manage the state of the namespace. @@ -108,6 +110,7 @@ import org.apache.hadoop.security.UserGroupInformation; **/ @InterfaceAudience.Private public class FSDirectory implements Closeable { + static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class); private static INodeDirectory createRoot(FSNamesystem namesystem) { final INodeDirectory r = new INodeDirectory( INodeId.ROOT_INODE_ID, @@ -157,6 +160,8 @@ public class FSDirectory implements Closeable { private final String fsOwnerShortUserName; private final String supergroup; + private final FSEditLog editLog; + // utility methods to acquire and release read lock and write lock void readLock() { this.dirLock.readLock().lock(); @@ -250,7 +255,7 @@ public class FSDirectory implements Closeable { + " times"); nameCache = new NameCache(threshold); namesystem = ns; - + this.editLog = ns.getEditLog(); ezManager = new EncryptionZoneManager(this, conf); } @@ -267,6 +272,14 @@ public class FSDirectory implements Closeable { return rootDir; } + boolean isPermissionEnabled() { + return isPermissionEnabled; + } + + FSEditLog getEditLog() { + return editLog; + } + /** * Shutdown the filestore */ @@ -1173,81 +1186,6 @@ public class FSDirectory implements Closeable { } } - /** - * Concat all the blocks from srcs to trg and delete the srcs files - */ - void concat(String target, String[] srcs, long timestamp) - throws UnresolvedLinkException, QuotaExceededException, - SnapshotAccessControlException, SnapshotException { - writeLock(); - try { - // actual move - unprotectedConcat(target, srcs, timestamp); - } finally { - writeUnlock(); - } - } - - /** - * Concat all the blocks from srcs to trg and delete the srcs files - * @param target target file to move the blocks to - * @param srcs list of file to move the blocks from - */ - void unprotectedConcat(String target, String [] srcs, long timestamp) - throws UnresolvedLinkException, QuotaExceededException, - SnapshotAccessControlException, SnapshotException { - assert hasWriteLock(); - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target); - } - // do the move - - final INodesInPath trgIIP = getINodesInPath4Write(target, true); - final INode[] trgINodes = trgIIP.getINodes(); - final INodeFile trgInode = trgIIP.getLastINode().asFile(); - INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory(); - final int trgLatestSnapshot = trgIIP.getLatestSnapshotId(); - - final INodeFile [] allSrcInodes = new INodeFile[srcs.length]; - for(int i = 0; i < srcs.length; i++) { - final INodesInPath iip = getINodesInPath4Write(srcs[i]); - final int latest = iip.getLatestSnapshotId(); - final INode inode = iip.getLastINode(); - - // check if the file in the latest snapshot - if (inode.isInLatestSnapshot(latest)) { - throw new SnapshotException("Concat: the source file " + srcs[i] - + " is in snapshot " + latest); - } - - // check if the file has other references. - if (inode.isReference() && ((INodeReference.WithCount) - inode.asReference().getReferredINode()).getReferenceCount() > 1) { - throw new SnapshotException("Concat: the source file " + srcs[i] - + " is referred by some other reference in some snapshot."); - } - - allSrcInodes[i] = inode.asFile(); - } - trgInode.concatBlocks(allSrcInodes); - - // since we are in the same dir - we can use same parent to remove files - int count = 0; - for(INodeFile nodeToRemove: allSrcInodes) { - if(nodeToRemove == null) continue; - - nodeToRemove.setBlocks(null); - trgParent.removeChild(nodeToRemove, trgLatestSnapshot); - inodeMap.remove(nodeToRemove); - count++; - } - - trgInode.setModificationTime(timestamp, trgLatestSnapshot); - trgParent.updateModificationTime(timestamp, trgLatestSnapshot); - // update quota on the parent directory ('count' files removed, 0 space) - unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0); - } - /** * Delete the target directory and collect the blocks under it * @@ -1790,8 +1728,7 @@ public class FSDirectory implements Closeable { * updates quota without verification * callers responsibility is to make sure quota is not exceeded */ - private static void unprotectedUpdateCount(INodesInPath inodesInPath, - int numOfINodes, long nsDelta, long dsDelta) { + static void unprotectedUpdateCount(INodesInPath inodesInPath, int numOfINodes, long nsDelta, long dsDelta) { final INode[] inodes = inodesInPath.getINodes(); for(int i=0; i < numOfINodes; i++) { if (inodes[i].isQuotaSet()) { // a directory with quota @@ -3413,4 +3350,10 @@ public class FSDirectory implements Closeable { } } } + + HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink) + throws IOException { + return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation()) + ? getFileInfo(path, resolveSymlink, false, false) : null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 716768e4d71..95dfefcf251 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -490,7 +490,7 @@ public class FSEditLogLoader { srcs[i] = renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion); } - fsDir.unprotectedConcat(trg, srcs, concatDeleteOp.timestamp); + FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp); if (toAddRetryCache) { fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8dfc219a968..405990557eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -258,8 +258,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetryCache; -import org.apache.hadoop.ipc.RetryCache.CacheEntry; -import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.metrics2.annotation.Metric; @@ -342,8 +340,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink) throws IOException { - return (isAuditEnabled() && isExternalInvocation()) - ? dir.getFileInfo(path, resolveSymlink, false, false) : null; + return dir.getAuditFileInfo(path, resolveSymlink); } private void logAuditEvent(boolean succeeded, String cmd, String src) @@ -1944,175 +1941,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * @throws IOException on error */ void concat(String target, String [] srcs, boolean logRetryCache) - throws IOException, UnresolvedLinkException { - if(FSNamesystem.LOG.isDebugEnabled()) { - FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) + - " to " + target); - } - - try { - concatInt(target, srcs, logRetryCache); - } catch (AccessControlException e) { - logAuditEvent(false, "concat", Arrays.toString(srcs), target, null); - throw e; - } - } - - private void concatInt(String target, String [] srcs, - boolean logRetryCache) throws IOException, UnresolvedLinkException { - // verify args - if(target.isEmpty()) { - throw new IllegalArgumentException("Target file name is empty"); - } - if(srcs == null || srcs.length == 0) { - throw new IllegalArgumentException("No sources given"); - } - - // We require all files be in the same directory - String trgParent = - target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR)); - for (String s : srcs) { - String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR)); - if (!srcParent.equals(trgParent)) { - throw new IllegalArgumentException( - "Sources and target are not in the same directory"); - } - } - - HdfsFileStatus resultingStat = null; - FSPermissionChecker pc = getPermissionChecker(); + throws IOException { checkOperation(OperationCategory.WRITE); waitForLoadingFSImage(); + HdfsFileStatus stat = null; + boolean success = false; writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot concat " + target); - concatInternal(pc, target, srcs, logRetryCache); - resultingStat = getAuditFileInfo(target, false); + stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache); + success = true; } finally { writeUnlock(); + if (success) { + getEditLog().logSync(); + } + logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat); } - getEditLog().logSync(); - logAuditEvent(true, "concat", Arrays.toString(srcs), target, resultingStat); } - /** See {@link #concat(String, String[])} */ - private void concatInternal(FSPermissionChecker pc, String target, - String[] srcs, boolean logRetryCache) throws IOException, - UnresolvedLinkException { - assert hasWriteLock(); - - // write permission for the target - if (isPermissionEnabled) { - checkPathAccess(pc, target, FsAction.WRITE); - - // and srcs - for(String aSrc: srcs) { - checkPathAccess(pc, aSrc, FsAction.READ); // read the file - checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete - } - } - - // to make sure no two files are the same - Set si = new HashSet(); - - // we put the following prerequisite for the operation - // replication and blocks sizes should be the same for ALL the blocks - - // check the target - final INodesInPath trgIip = dir.getINodesInPath4Write(target); - if (dir.getEZForPath(trgIip) != null) { - throw new HadoopIllegalArgumentException( - "concat can not be called for files in an encryption zone."); - } - final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), - target); - if(trgInode.isUnderConstruction()) { - throw new HadoopIllegalArgumentException("concat: target file " - + target + " is under construction"); - } - // per design target shouldn't be empty and all the blocks same size - if(trgInode.numBlocks() == 0) { - throw new HadoopIllegalArgumentException("concat: target file " - + target + " is empty"); - } - if (trgInode.isWithSnapshot()) { - throw new HadoopIllegalArgumentException("concat: target file " - + target + " is in a snapshot"); - } - - long blockSize = trgInode.getPreferredBlockSize(); - - // check the end block to be full - final BlockInfo last = trgInode.getLastBlock(); - if(blockSize != last.getNumBytes()) { - throw new HadoopIllegalArgumentException("The last block in " + target - + " is not full; last block size = " + last.getNumBytes() - + " but file block size = " + blockSize); - } - - si.add(trgInode); - final short repl = trgInode.getFileReplication(); - - // now check the srcs - boolean endSrc = false; // final src file doesn't have to have full end block - for(int i=0; i= 0 && srcBlocks[idx].getNumBytes() != blockSize) { - throw new HadoopIllegalArgumentException("concat: the source file " - + src + " and the target file " + target - + " should have the same blocks sizes: target block size is " - + blockSize + " but the size of source block " + idx + " is " - + srcBlocks[idx].getNumBytes()); - } - - si.add(srcInode); - } - - // make sure no two files are the same - if(si.size() < srcs.length+1) { // trg + srcs - // it means at least two files are the same - throw new HadoopIllegalArgumentException( - "concat: at least two of the source files are the same"); - } - - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + - Arrays.toString(srcs) + " to " + target); - } - - long timestamp = now(); - dir.concat(target, srcs, timestamp); - getEditLog().logConcat(target, srcs, timestamp, logRetryCache); - } - /** * stores the modification and access time for this inode. * The access time is precise up to an hour. The transaction, if needed, is @@ -7240,7 +7088,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * Client invoked methods are invoked over RPC and will be in * RPC call context even if the client exits. */ - private boolean isExternalInvocation() { + boolean isExternalInvocation() { return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation(); }