HDFS-7436. Consolidate implementation of concat(). Contributed by Haohui Mai.
This commit is contained in:
parent
57d62d4ded
commit
9d462fb60c
|
@ -136,6 +136,8 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7419. Improve error messages for DataNode hot swap drive feature (Lei
|
||||
Xu via Colin P. Mccabe)
|
||||
|
||||
HDFS-7436. Consolidate implementation of concat(). (wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -0,0 +1,233 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
class FSDirConcatOp {
|
||||
static HdfsFileStatus concat(
|
||||
FSDirectory fsd, String target, String[] srcs,
|
||||
boolean logRetryCache) throws IOException {
|
||||
Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
|
||||
Preconditions.checkArgument(srcs != null && srcs.length > 0,
|
||||
"No sources given");
|
||||
assert srcs != null;
|
||||
|
||||
FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
|
||||
// We require all files be in the same directory
|
||||
String trgParent =
|
||||
target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
|
||||
for (String s : srcs) {
|
||||
String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
|
||||
if (!srcParent.equals(trgParent)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Sources and target are not in the same directory");
|
||||
}
|
||||
}
|
||||
|
||||
// write permission for the target
|
||||
if (fsd.isPermissionEnabled()) {
|
||||
FSPermissionChecker pc = fsd.getPermissionChecker();
|
||||
fsd.checkPathAccess(pc, target, FsAction.WRITE);
|
||||
|
||||
// and srcs
|
||||
for(String aSrc: srcs) {
|
||||
fsd.checkPathAccess(pc, aSrc, FsAction.READ); // read the file
|
||||
fsd.checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
|
||||
}
|
||||
}
|
||||
|
||||
// to make sure no two files are the same
|
||||
Set<INode> si = new HashSet<INode>();
|
||||
|
||||
// we put the following prerequisite for the operation
|
||||
// replication and blocks sizes should be the same for ALL the blocks
|
||||
|
||||
// check the target
|
||||
final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
|
||||
if (fsd.getEZForPath(trgIip) != null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"concat can not be called for files in an encryption zone.");
|
||||
}
|
||||
final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target);
|
||||
if(trgInode.isUnderConstruction()) {
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is under construction");
|
||||
}
|
||||
// per design target shouldn't be empty and all the blocks same size
|
||||
if(trgInode.numBlocks() == 0) {
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is empty");
|
||||
}
|
||||
if (trgInode.isWithSnapshot()) {
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is in a snapshot");
|
||||
}
|
||||
|
||||
long blockSize = trgInode.getPreferredBlockSize();
|
||||
|
||||
// check the end block to be full
|
||||
final BlockInfo last = trgInode.getLastBlock();
|
||||
if(blockSize != last.getNumBytes()) {
|
||||
throw new HadoopIllegalArgumentException("The last block in " + target
|
||||
+ " is not full; last block size = " + last.getNumBytes()
|
||||
+ " but file block size = " + blockSize);
|
||||
}
|
||||
|
||||
si.add(trgInode);
|
||||
final short repl = trgInode.getFileReplication();
|
||||
|
||||
// now check the srcs
|
||||
boolean endSrc = false; // final src file doesn't have to have full end block
|
||||
for(int i=0; i< srcs.length; i++) {
|
||||
String src = srcs[i];
|
||||
if(i== srcs.length-1)
|
||||
endSrc=true;
|
||||
|
||||
final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src);
|
||||
if(src.isEmpty()
|
||||
|| srcInode.isUnderConstruction()
|
||||
|| srcInode.numBlocks() == 0) {
|
||||
throw new HadoopIllegalArgumentException("concat: source file " + src
|
||||
+ " is invalid or empty or underConstruction");
|
||||
}
|
||||
|
||||
// check replication and blocks size
|
||||
if(repl != srcInode.getBlockReplication()) {
|
||||
throw new HadoopIllegalArgumentException("concat: the source file "
|
||||
+ src + " and the target file " + target
|
||||
+ " should have the same replication: source replication is "
|
||||
+ srcInode.getBlockReplication()
|
||||
+ " but target replication is " + repl);
|
||||
}
|
||||
|
||||
//boolean endBlock=false;
|
||||
// verify that all the blocks are of the same length as target
|
||||
// should be enough to check the end blocks
|
||||
final BlockInfo[] srcBlocks = srcInode.getBlocks();
|
||||
int idx = srcBlocks.length-1;
|
||||
if(endSrc)
|
||||
idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
|
||||
if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
|
||||
throw new HadoopIllegalArgumentException("concat: the source file "
|
||||
+ src + " and the target file " + target
|
||||
+ " should have the same blocks sizes: target block size is "
|
||||
+ blockSize + " but the size of source block " + idx + " is "
|
||||
+ srcBlocks[idx].getNumBytes());
|
||||
}
|
||||
|
||||
si.add(srcInode);
|
||||
}
|
||||
|
||||
// make sure no two files are the same
|
||||
if(si.size() < srcs.length+1) { // trg + srcs
|
||||
// it means at least two files are the same
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"concat: at least two of the source files are the same");
|
||||
}
|
||||
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
|
||||
Arrays.toString(srcs) + " to " + target);
|
||||
}
|
||||
|
||||
long timestamp = now();
|
||||
fsd.writeLock();
|
||||
try {
|
||||
unprotectedConcat(fsd, target, srcs, timestamp);
|
||||
} finally {
|
||||
fsd.writeUnlock();
|
||||
}
|
||||
fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
|
||||
return fsd.getAuditFileInfo(target, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Concat all the blocks from srcs to trg and delete the srcs files
|
||||
* @param fsd FSDirectory
|
||||
* @param target target file to move the blocks to
|
||||
* @param srcs list of file to move the blocks from
|
||||
*/
|
||||
static void unprotectedConcat(
|
||||
FSDirectory fsd, String target, String[] srcs, long timestamp)
|
||||
throws IOException {
|
||||
assert fsd.hasWriteLock();
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
|
||||
}
|
||||
// do the move
|
||||
|
||||
final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
|
||||
final INode[] trgINodes = trgIIP.getINodes();
|
||||
final INodeFile trgInode = trgIIP.getLastINode().asFile();
|
||||
INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
|
||||
final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
|
||||
|
||||
final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
|
||||
for(int i = 0; i < srcs.length; i++) {
|
||||
final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]);
|
||||
final int latest = iip.getLatestSnapshotId();
|
||||
final INode inode = iip.getLastINode();
|
||||
|
||||
// check if the file in the latest snapshot
|
||||
if (inode.isInLatestSnapshot(latest)) {
|
||||
throw new SnapshotException("Concat: the source file " + srcs[i]
|
||||
+ " is in snapshot " + latest);
|
||||
}
|
||||
|
||||
// check if the file has other references.
|
||||
if (inode.isReference() && ((INodeReference.WithCount)
|
||||
inode.asReference().getReferredINode()).getReferenceCount() > 1) {
|
||||
throw new SnapshotException("Concat: the source file " + srcs[i]
|
||||
+ " is referred by some other reference in some snapshot.");
|
||||
}
|
||||
|
||||
allSrcInodes[i] = inode.asFile();
|
||||
}
|
||||
trgInode.concatBlocks(allSrcInodes);
|
||||
|
||||
// since we are in the same dir - we can use same parent to remove files
|
||||
int count = 0;
|
||||
for(INodeFile nodeToRemove: allSrcInodes) {
|
||||
if(nodeToRemove == null) continue;
|
||||
|
||||
nodeToRemove.setBlocks(null);
|
||||
trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
|
||||
fsd.getINodeMap().remove(nodeToRemove);
|
||||
count++;
|
||||
}
|
||||
|
||||
trgInode.setModificationTime(timestamp, trgLatestSnapshot);
|
||||
trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
|
||||
// update quota on the parent directory ('count' files removed, 0 space)
|
||||
FSDirectory.unprotectedUpdateCount(trgIIP, trgINodes.length - 1, -count, 0);
|
||||
}
|
||||
}
|
|
@ -98,6 +98,8 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Both FSDirectory and FSNamesystem manage the state of the namespace.
|
||||
|
@ -108,6 +110,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
**/
|
||||
@InterfaceAudience.Private
|
||||
public class FSDirectory implements Closeable {
|
||||
static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class);
|
||||
private static INodeDirectory createRoot(FSNamesystem namesystem) {
|
||||
final INodeDirectory r = new INodeDirectory(
|
||||
INodeId.ROOT_INODE_ID,
|
||||
|
@ -157,6 +160,8 @@ public class FSDirectory implements Closeable {
|
|||
private final String fsOwnerShortUserName;
|
||||
private final String supergroup;
|
||||
|
||||
private final FSEditLog editLog;
|
||||
|
||||
// utility methods to acquire and release read lock and write lock
|
||||
void readLock() {
|
||||
this.dirLock.readLock().lock();
|
||||
|
@ -250,7 +255,7 @@ public class FSDirectory implements Closeable {
|
|||
+ " times");
|
||||
nameCache = new NameCache<ByteArray>(threshold);
|
||||
namesystem = ns;
|
||||
|
||||
this.editLog = ns.getEditLog();
|
||||
ezManager = new EncryptionZoneManager(this, conf);
|
||||
}
|
||||
|
||||
|
@ -267,6 +272,14 @@ public class FSDirectory implements Closeable {
|
|||
return rootDir;
|
||||
}
|
||||
|
||||
boolean isPermissionEnabled() {
|
||||
return isPermissionEnabled;
|
||||
}
|
||||
|
||||
FSEditLog getEditLog() {
|
||||
return editLog;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the filestore
|
||||
*/
|
||||
|
@ -1173,81 +1186,6 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Concat all the blocks from srcs to trg and delete the srcs files
|
||||
*/
|
||||
void concat(String target, String[] srcs, long timestamp)
|
||||
throws UnresolvedLinkException, QuotaExceededException,
|
||||
SnapshotAccessControlException, SnapshotException {
|
||||
writeLock();
|
||||
try {
|
||||
// actual move
|
||||
unprotectedConcat(target, srcs, timestamp);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Concat all the blocks from srcs to trg and delete the srcs files
|
||||
* @param target target file to move the blocks to
|
||||
* @param srcs list of file to move the blocks from
|
||||
*/
|
||||
void unprotectedConcat(String target, String [] srcs, long timestamp)
|
||||
throws UnresolvedLinkException, QuotaExceededException,
|
||||
SnapshotAccessControlException, SnapshotException {
|
||||
assert hasWriteLock();
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
|
||||
}
|
||||
// do the move
|
||||
|
||||
final INodesInPath trgIIP = getINodesInPath4Write(target, true);
|
||||
final INode[] trgINodes = trgIIP.getINodes();
|
||||
final INodeFile trgInode = trgIIP.getLastINode().asFile();
|
||||
INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
|
||||
final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
|
||||
|
||||
final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
|
||||
for(int i = 0; i < srcs.length; i++) {
|
||||
final INodesInPath iip = getINodesInPath4Write(srcs[i]);
|
||||
final int latest = iip.getLatestSnapshotId();
|
||||
final INode inode = iip.getLastINode();
|
||||
|
||||
// check if the file in the latest snapshot
|
||||
if (inode.isInLatestSnapshot(latest)) {
|
||||
throw new SnapshotException("Concat: the source file " + srcs[i]
|
||||
+ " is in snapshot " + latest);
|
||||
}
|
||||
|
||||
// check if the file has other references.
|
||||
if (inode.isReference() && ((INodeReference.WithCount)
|
||||
inode.asReference().getReferredINode()).getReferenceCount() > 1) {
|
||||
throw new SnapshotException("Concat: the source file " + srcs[i]
|
||||
+ " is referred by some other reference in some snapshot.");
|
||||
}
|
||||
|
||||
allSrcInodes[i] = inode.asFile();
|
||||
}
|
||||
trgInode.concatBlocks(allSrcInodes);
|
||||
|
||||
// since we are in the same dir - we can use same parent to remove files
|
||||
int count = 0;
|
||||
for(INodeFile nodeToRemove: allSrcInodes) {
|
||||
if(nodeToRemove == null) continue;
|
||||
|
||||
nodeToRemove.setBlocks(null);
|
||||
trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
|
||||
inodeMap.remove(nodeToRemove);
|
||||
count++;
|
||||
}
|
||||
|
||||
trgInode.setModificationTime(timestamp, trgLatestSnapshot);
|
||||
trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
|
||||
// update quota on the parent directory ('count' files removed, 0 space)
|
||||
unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the target directory and collect the blocks under it
|
||||
*
|
||||
|
@ -1790,8 +1728,7 @@ public class FSDirectory implements Closeable {
|
|||
* updates quota without verification
|
||||
* callers responsibility is to make sure quota is not exceeded
|
||||
*/
|
||||
private static void unprotectedUpdateCount(INodesInPath inodesInPath,
|
||||
int numOfINodes, long nsDelta, long dsDelta) {
|
||||
static void unprotectedUpdateCount(INodesInPath inodesInPath, int numOfINodes, long nsDelta, long dsDelta) {
|
||||
final INode[] inodes = inodesInPath.getINodes();
|
||||
for(int i=0; i < numOfINodes; i++) {
|
||||
if (inodes[i].isQuotaSet()) { // a directory with quota
|
||||
|
@ -3415,4 +3352,10 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
|
||||
throws IOException {
|
||||
return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
|
||||
? getFileInfo(path, resolveSymlink, false, false) : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -491,7 +491,7 @@ public class FSEditLogLoader {
|
|||
srcs[i] =
|
||||
renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
|
||||
}
|
||||
fsDir.unprotectedConcat(trg, srcs, concatDeleteOp.timestamp);
|
||||
FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp);
|
||||
|
||||
if (toAddRetryCache) {
|
||||
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
|
||||
|
|
|
@ -262,8 +262,6 @@ import org.apache.hadoop.io.IOUtils;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
|
||||
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
|
@ -331,8 +329,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
|
||||
throws IOException {
|
||||
return (isAuditEnabled() && isExternalInvocation())
|
||||
? dir.getFileInfo(path, resolveSymlink, false, false) : null;
|
||||
return dir.getAuditFileInfo(path, resolveSymlink);
|
||||
}
|
||||
|
||||
private void logAuditEvent(boolean succeeded, String cmd, String src)
|
||||
|
@ -1934,173 +1931,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @throws IOException on error
|
||||
*/
|
||||
void concat(String target, String [] srcs, boolean logRetryCache)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
if(FSNamesystem.LOG.isDebugEnabled()) {
|
||||
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
|
||||
" to " + target);
|
||||
}
|
||||
|
||||
try {
|
||||
concatInt(target, srcs, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private void concatInt(String target, String [] srcs,
|
||||
boolean logRetryCache) throws IOException, UnresolvedLinkException {
|
||||
// verify args
|
||||
if(target.isEmpty()) {
|
||||
throw new IllegalArgumentException("Target file name is empty");
|
||||
}
|
||||
if(srcs == null || srcs.length == 0) {
|
||||
throw new IllegalArgumentException("No sources given");
|
||||
}
|
||||
|
||||
// We require all files be in the same directory
|
||||
String trgParent =
|
||||
target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
|
||||
for (String s : srcs) {
|
||||
String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
|
||||
if (!srcParent.equals(trgParent)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Sources and target are not in the same directory");
|
||||
}
|
||||
}
|
||||
|
||||
HdfsFileStatus resultingStat = null;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
waitForLoadingFSImage();
|
||||
HdfsFileStatus stat = null;
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot concat " + target);
|
||||
concatInternal(pc, target, srcs, logRetryCache);
|
||||
resultingStat = getAuditFileInfo(target, false);
|
||||
stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
getEditLog().logSync();
|
||||
logAuditEvent(true, "concat", Arrays.toString(srcs), target, resultingStat);
|
||||
}
|
||||
|
||||
/** See {@link #concat(String, String[])} */
|
||||
private void concatInternal(FSPermissionChecker pc, String target,
|
||||
String[] srcs, boolean logRetryCache) throws IOException,
|
||||
UnresolvedLinkException {
|
||||
assert hasWriteLock();
|
||||
|
||||
// write permission for the target
|
||||
if (isPermissionEnabled) {
|
||||
checkPathAccess(pc, target, FsAction.WRITE);
|
||||
|
||||
// and srcs
|
||||
for(String aSrc: srcs) {
|
||||
checkPathAccess(pc, aSrc, FsAction.READ); // read the file
|
||||
checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
|
||||
if (success) {
|
||||
getEditLog().logSync();
|
||||
}
|
||||
logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
|
||||
}
|
||||
|
||||
// to make sure no two files are the same
|
||||
Set<INode> si = new HashSet<INode>();
|
||||
|
||||
// we put the following prerequisite for the operation
|
||||
// replication and blocks sizes should be the same for ALL the blocks
|
||||
|
||||
// check the target
|
||||
final INodesInPath trgIip = dir.getINodesInPath4Write(target);
|
||||
if (dir.getEZForPath(trgIip) != null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"concat can not be called for files in an encryption zone.");
|
||||
}
|
||||
final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(),
|
||||
target);
|
||||
if(trgInode.isUnderConstruction()) {
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is under construction");
|
||||
}
|
||||
// per design target shouldn't be empty and all the blocks same size
|
||||
if(trgInode.numBlocks() == 0) {
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is empty");
|
||||
}
|
||||
if (trgInode.isWithSnapshot()) {
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is in a snapshot");
|
||||
}
|
||||
|
||||
long blockSize = trgInode.getPreferredBlockSize();
|
||||
|
||||
// check the end block to be full
|
||||
final BlockInfo last = trgInode.getLastBlock();
|
||||
if(blockSize != last.getNumBytes()) {
|
||||
throw new HadoopIllegalArgumentException("The last block in " + target
|
||||
+ " is not full; last block size = " + last.getNumBytes()
|
||||
+ " but file block size = " + blockSize);
|
||||
}
|
||||
|
||||
si.add(trgInode);
|
||||
final short repl = trgInode.getFileReplication();
|
||||
|
||||
// now check the srcs
|
||||
boolean endSrc = false; // final src file doesn't have to have full end block
|
||||
for(int i=0; i<srcs.length; i++) {
|
||||
String src = srcs[i];
|
||||
if(i==srcs.length-1)
|
||||
endSrc=true;
|
||||
|
||||
final INodeFile srcInode = INodeFile.valueOf(dir.getINode4Write(src), src);
|
||||
if(src.isEmpty()
|
||||
|| srcInode.isUnderConstruction()
|
||||
|| srcInode.numBlocks() == 0) {
|
||||
throw new HadoopIllegalArgumentException("concat: source file " + src
|
||||
+ " is invalid or empty or underConstruction");
|
||||
}
|
||||
|
||||
// check replication and blocks size
|
||||
if(repl != srcInode.getBlockReplication()) {
|
||||
throw new HadoopIllegalArgumentException("concat: the source file "
|
||||
+ src + " and the target file " + target
|
||||
+ " should have the same replication: source replication is "
|
||||
+ srcInode.getBlockReplication()
|
||||
+ " but target replication is " + repl);
|
||||
}
|
||||
|
||||
//boolean endBlock=false;
|
||||
// verify that all the blocks are of the same length as target
|
||||
// should be enough to check the end blocks
|
||||
final BlockInfo[] srcBlocks = srcInode.getBlocks();
|
||||
int idx = srcBlocks.length-1;
|
||||
if(endSrc)
|
||||
idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
|
||||
if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
|
||||
throw new HadoopIllegalArgumentException("concat: the source file "
|
||||
+ src + " and the target file " + target
|
||||
+ " should have the same blocks sizes: target block size is "
|
||||
+ blockSize + " but the size of source block " + idx + " is "
|
||||
+ srcBlocks[idx].getNumBytes());
|
||||
}
|
||||
|
||||
si.add(srcInode);
|
||||
}
|
||||
|
||||
// make sure no two files are the same
|
||||
if(si.size() < srcs.length+1) { // trg + srcs
|
||||
// it means at least two files are the same
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"concat: at least two of the source files are the same");
|
||||
}
|
||||
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
|
||||
Arrays.toString(srcs) + " to " + target);
|
||||
}
|
||||
|
||||
long timestamp = now();
|
||||
dir.concat(target, srcs, timestamp);
|
||||
getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -7240,7 +7088,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* Client invoked methods are invoked over RPC and will be in
|
||||
* RPC call context even if the client exits.
|
||||
*/
|
||||
private boolean isExternalInvocation() {
|
||||
boolean isExternalInvocation() {
|
||||
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue