HDFS-5285. Flatten INodeFile hierarchy: Replace INodeFileUnderConstruction and INodeFileUnderConstructionWithSnapshot with FileUnderContructionFeature. Contributed by jing9

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1544389 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-11-22 01:39:02 +00:00
parent 33a8234040
commit ce68f410b0
25 changed files with 507 additions and 741 deletions

View File

@ -213,6 +213,10 @@ Trunk (Unreleased)
HDFS-5473. Consistent naming of user-visible caching classes and methods HDFS-5473. Consistent naming of user-visible caching classes and methods
(cmccabe) (cmccabe)
HDFS-5285. Flatten INodeFile hierarchy: Replace INodeFileUnderConstruction
and INodeFileUnderConstructionWithSnapshot with FileUnderContructionFeature.
(jing9 via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -64,4 +64,21 @@ public interface BlockCollection {
* Get the name of the collection. * Get the name of the collection.
*/ */
public String getName(); public String getName();
/**
* Set the block at the given index.
*/
public void setBlock(int index, BlockInfo blk);
/**
* Convert the last block of the collection to an under-construction block
* and set the locations.
*/
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] locations) throws IOException;
/**
* @return whether the block collection is under construction.
*/
public boolean isUnderConstruction();
} }

View File

@ -560,7 +560,7 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number * @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes. * of replicas reported from data-nodes.
*/ */
public boolean commitOrCompleteLastBlock(MutableBlockCollection bc, public boolean commitOrCompleteLastBlock(BlockCollection bc,
Block commitBlock) throws IOException { Block commitBlock) throws IOException {
if(commitBlock == null) if(commitBlock == null)
return false; // not committing, this is a block allocation retry return false; // not committing, this is a block allocation retry
@ -583,7 +583,7 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number * @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes. * of replicas reported from data-nodes.
*/ */
private BlockInfo completeBlock(final MutableBlockCollection bc, private BlockInfo completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException { final int blkIndex, boolean force) throws IOException {
if(blkIndex < 0) if(blkIndex < 0)
return null; return null;
@ -616,7 +616,7 @@ public class BlockManager {
return blocksMap.replaceBlock(completeBlock); return blocksMap.replaceBlock(completeBlock);
} }
private BlockInfo completeBlock(final MutableBlockCollection bc, private BlockInfo completeBlock(final BlockCollection bc,
final BlockInfo block, boolean force) throws IOException { final BlockInfo block, boolean force) throws IOException {
BlockInfo[] fileBlocks = bc.getBlocks(); BlockInfo[] fileBlocks = bc.getBlocks();
for(int idx = 0; idx < fileBlocks.length; idx++) for(int idx = 0; idx < fileBlocks.length; idx++)
@ -631,7 +631,7 @@ public class BlockManager {
* regardless of whether enough replicas are present. This is necessary * regardless of whether enough replicas are present. This is necessary
* when tailing edit logs as a Standby. * when tailing edit logs as a Standby.
*/ */
public BlockInfo forceCompleteBlock(final MutableBlockCollection bc, public BlockInfo forceCompleteBlock(final BlockCollection bc,
final BlockInfoUnderConstruction block) throws IOException { final BlockInfoUnderConstruction block) throws IOException {
block.commitBlock(block); block.commitBlock(block);
return completeBlock(bc, block, true); return completeBlock(bc, block, true);
@ -652,7 +652,7 @@ public class BlockManager {
* @return the last block locations if the block is partial or null otherwise * @return the last block locations if the block is partial or null otherwise
*/ */
public LocatedBlock convertLastBlockToUnderConstruction( public LocatedBlock convertLastBlockToUnderConstruction(
MutableBlockCollection bc) throws IOException { BlockCollection bc) throws IOException {
BlockInfo oldBlock = bc.getLastBlock(); BlockInfo oldBlock = bc.getLastBlock();
if(oldBlock == null || if(oldBlock == null ||
bc.getPreferredBlockSize() == oldBlock.getNumBytes()) bc.getPreferredBlockSize() == oldBlock.getNumBytes())
@ -1209,7 +1209,7 @@ public class BlockManager {
// block should belong to a file // block should belong to a file
bc = blocksMap.getBlockCollection(block); bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append // abandoned block or block reopened for append
if(bc == null || bc instanceof MutableBlockCollection) { if(bc == null || bc.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications neededReplications.remove(block, priority); // remove from neededReplications
continue; continue;
} }
@ -1290,7 +1290,7 @@ public class BlockManager {
// block should belong to a file // block should belong to a file
bc = blocksMap.getBlockCollection(block); bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append // abandoned block or block reopened for append
if(bc == null || bc instanceof MutableBlockCollection) { if(bc == null || bc.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null; rw.targets = null;
continue; continue;
@ -2145,7 +2145,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
int numCurrentReplica = countLiveNodes(storedBlock); int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& numCurrentReplica >= minReplication) { && numCurrentReplica >= minReplication) {
completeBlock((MutableBlockCollection)storedBlock.getBlockCollection(), storedBlock, false); completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
} else if (storedBlock.isComplete()) { } else if (storedBlock.isComplete()) {
// check whether safe replication is reached for the block // check whether safe replication is reached for the block
// only complete blocks are counted towards that. // only complete blocks are counted towards that.
@ -2215,7 +2215,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication) { numLiveReplicas >= minReplication) {
storedBlock = completeBlock((MutableBlockCollection)bc, storedBlock, false); storedBlock = completeBlock(bc, storedBlock, false);
} else if (storedBlock.isComplete()) { } else if (storedBlock.isComplete()) {
// check whether safe replication is reached for the block // check whether safe replication is reached for the block
// only complete blocks are counted towards that // only complete blocks are counted towards that
@ -2226,7 +2226,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
} }
// if file is under construction, then done for now // if file is under construction, then done for now
if (bc instanceof MutableBlockCollection) { if (bc.isUnderConstruction()) {
return storedBlock; return storedBlock;
} }
@ -2835,7 +2835,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ ", corrupt replicas: " + num.corruptReplicas() + ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissionedReplicas() + ", decommissioned replicas: " + num.decommissionedReplicas()
+ ", excess replicas: " + num.excessReplicas() + ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + (bc instanceof MutableBlockCollection) + ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: " + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode + ", Is current datanode decommissioning: " + srcNode + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress()); + srcNode.isDecommissionInProgress());
@ -2894,7 +2894,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) { if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
decommissionOnlyReplicas++; decommissionOnlyReplicas++;
} }
if (bc instanceof MutableBlockCollection) { if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++; underReplicatedInOpenFiles++;
} }
} }

View File

@ -276,13 +276,9 @@ public class FSDirectory implements Closeable {
* @throws UnresolvedLinkException * @throws UnresolvedLinkException
* @throws SnapshotAccessControlException * @throws SnapshotAccessControlException
*/ */
INodeFileUnderConstruction addFile(String path, INodeFile addFile(String path, PermissionStatus permissions,
PermissionStatus permissions, short replication, long preferredBlockSize, String clientName,
short replication, String clientMachine, DatanodeDescriptor clientNode)
long preferredBlockSize,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode)
throws FileAlreadyExistsException, QuotaExceededException, throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException { UnresolvedLinkException, SnapshotAccessControlException {
waitForReady(); waitForReady();
@ -300,11 +296,11 @@ public class FSDirectory implements Closeable {
if (!mkdirs(parent.toString(), permissions, true, modTime)) { if (!mkdirs(parent.toString(), permissions, true, modTime)) {
return null; return null;
} }
INodeFileUnderConstruction newNode = new INodeFileUnderConstruction( INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
namesystem.allocateNewInodeId(), permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
permissions,replication, preferredBlockSize);
preferredBlockSize, modTime, clientName, newNode.toUnderConstruction(clientName, clientMachine, clientNode);
clientMachine, clientNode);
boolean added = false; boolean added = false;
writeLock(); writeLock();
try { try {
@ -336,8 +332,11 @@ public class FSDirectory implements Closeable {
final INodeFile newNode; final INodeFile newNode;
assert hasWriteLock(); assert hasWriteLock();
if (underConstruction) { if (underConstruction) {
newNode = new INodeFileUnderConstruction(id, permissions, replication, newNode = new INodeFile(id, null, permissions, modificationTime,
preferredBlockSize, modificationTime, clientName, clientMachine, null); modificationTime, BlockInfo.EMPTY_ARRAY, replication,
preferredBlockSize);
newNode.toUnderConstruction(clientName, clientMachine, null);
} else { } else {
newNode = new INodeFile(id, null, permissions, modificationTime, atime, newNode = new INodeFile(id, null, permissions, modificationTime, atime,
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize); BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize);
@ -366,8 +365,8 @@ public class FSDirectory implements Closeable {
writeLock(); writeLock();
try { try {
final INodeFileUnderConstruction fileINode = final INodeFile fileINode = inodesInPath.getLastINode().asFile();
INodeFileUnderConstruction.valueOf(inodesInPath.getLastINode(), path); Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed // check quota limits and updated space consumed
updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true); updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
@ -397,8 +396,8 @@ public class FSDirectory implements Closeable {
/** /**
* Persist the block list for the inode. * Persist the block list for the inode.
*/ */
void persistBlocks(String path, INodeFileUnderConstruction file, void persistBlocks(String path, INodeFile file, boolean logRetryCache) {
boolean logRetryCache) { Preconditions.checkArgument(file.isUnderConstruction());
waitForReady(); waitForReady();
writeLock(); writeLock();
@ -437,8 +436,9 @@ public class FSDirectory implements Closeable {
* Remove a block from the file. * Remove a block from the file.
* @return Whether the block exists in the corresponding file * @return Whether the block exists in the corresponding file
*/ */
boolean removeBlock(String path, INodeFileUnderConstruction fileNode, boolean removeBlock(String path, INodeFile fileNode, Block block)
Block block) throws IOException { throws IOException {
Preconditions.checkArgument(fileNode.isUnderConstruction());
waitForReady(); waitForReady();
writeLock(); writeLock();
@ -450,7 +450,8 @@ public class FSDirectory implements Closeable {
} }
boolean unprotectedRemoveBlock(String path, boolean unprotectedRemoveBlock(String path,
INodeFileUnderConstruction fileNode, Block block) throws IOException { INodeFile fileNode, Block block) throws IOException {
Preconditions.checkArgument(fileNode.isUnderConstruction());
// modify file-> block and blocksMap // modify file-> block and blocksMap
boolean removed = fileNode.removeLastBlock(block); boolean removed = fileNode.removeLastBlock(block);
if (!removed) { if (!removed) {
@ -1477,38 +1478,6 @@ public class FSDirectory implements Closeable {
} }
} }
/**
* Replaces the specified INodeFile with the specified one.
*/
void replaceINodeFile(String path, INodeFile oldnode,
INodeFile newnode) throws IOException {
writeLock();
try {
unprotectedReplaceINodeFile(path, oldnode, newnode);
} finally {
writeUnlock();
}
}
/** Replace an INodeFile and record modification for the latest snapshot. */
void unprotectedReplaceINodeFile(final String path, final INodeFile oldnode,
final INodeFile newnode) {
Preconditions.checkState(hasWriteLock());
oldnode.getParent().replaceChild(oldnode, newnode, inodeMap);
oldnode.clear();
/* Currently oldnode and newnode are assumed to contain the same
* blocks. Otherwise, blocks need to be removed from the blocksMap.
*/
int index = 0;
for (BlockInfo b : newnode.getBlocks()) {
BlockInfo info = getBlockManager().addBlockCollection(b, newnode);
newnode.setBlock(index, info); // inode refers to the block in BlocksMap
index++;
}
}
/** /**
* Get a partial listing of the indicated directory * Get a partial listing of the indicated directory
* *

View File

@ -680,8 +680,8 @@ public class FSEditLog implements LogsPurgeable {
* Add open lease record to edit log. * Add open lease record to edit log.
* Records the block locations of the last block. * Records the block locations of the last block.
*/ */
public void logOpenFile(String path, INodeFileUnderConstruction newNode, public void logOpenFile(String path, INodeFile newNode, boolean toLogRpcIds) {
boolean toLogRpcIds) { Preconditions.checkArgument(newNode.isUnderConstruction());
AddOp op = AddOp.getInstance(cache.get()) AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId()) .setInodeId(newNode.getId())
.setPath(path) .setPath(path)
@ -691,8 +691,8 @@ public class FSEditLog implements LogsPurgeable {
.setBlockSize(newNode.getPreferredBlockSize()) .setBlockSize(newNode.getPreferredBlockSize())
.setBlocks(newNode.getBlocks()) .setBlocks(newNode.getBlocks())
.setPermissionStatus(newNode.getPermissionStatus()) .setPermissionStatus(newNode.getPermissionStatus())
.setClientName(newNode.getClientName()) .setClientName(newNode.getFileUnderConstructionFeature().getClientName())
.setClientMachine(newNode.getClientMachine()); .setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
logRpcIds(op, toLogRpcIds); logRpcIds(op, toLogRpcIds);
logEdit(op); logEdit(op);
} }
@ -713,8 +713,8 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op); logEdit(op);
} }
public void logUpdateBlocks(String path, INodeFileUnderConstruction file, public void logUpdateBlocks(String path, INodeFile file, boolean toLogRpcIds) {
boolean toLogRpcIds) { Preconditions.checkArgument(file.isUnderConstruction());
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get()) UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setBlocks(file.getBlocks()); .setBlocks(file.getBlocks());

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.util.ChunkedArrayList;
import org.apache.hadoop.hdfs.util.Holder; import org.apache.hadoop.hdfs.util.Holder;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -369,15 +370,15 @@ public class FSEditLogLoader {
} }
final INodesInPath iip = fsDir.getLastINodeInPath(addCloseOp.path); final INodesInPath iip = fsDir.getLastINodeInPath(addCloseOp.path);
final INodeFile oldFile = INodeFile.valueOf(iip.getINode(0), addCloseOp.path); final INodeFile file = INodeFile.valueOf(iip.getINode(0), addCloseOp.path);
// Update the salient file attributes. // Update the salient file attributes.
oldFile.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap()); file.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
oldFile.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap()); file.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
updateBlocks(fsDir, addCloseOp, oldFile); updateBlocks(fsDir, addCloseOp, file);
// Now close the file // Now close the file
if (!oldFile.isUnderConstruction() && if (!file.isUnderConstruction() &&
logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) { logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
// There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE // There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
// could show up twice in a row. But after that version, this // could show up twice in a row. But after that version, this
@ -387,11 +388,9 @@ public class FSEditLogLoader {
} }
// One might expect that you could use removeLease(holder, path) here, // One might expect that you could use removeLease(holder, path) here,
// but OP_CLOSE doesn't serialize the holder. So, remove by path. // but OP_CLOSE doesn't serialize the holder. So, remove by path.
if (oldFile.isUnderConstruction()) { if (file.isUnderConstruction()) {
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path); fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
INodeFile newFile = ucFile.toINodeFile(ucFile.getModificationTime()); file.toCompleteFile(file.getModificationTime());
fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile);
} }
break; break;
} }
@ -564,9 +563,8 @@ public class FSEditLogLoader {
Lease lease = fsNamesys.leaseManager.getLease( Lease lease = fsNamesys.leaseManager.getLease(
reassignLeaseOp.leaseHolder); reassignLeaseOp.leaseHolder);
INodeFileUnderConstruction pendingFile = INodeFile pendingFile = fsDir.getINode(reassignLeaseOp.path).asFile();
INodeFileUnderConstruction.valueOf( Preconditions.checkState(pendingFile.isUnderConstruction());
fsDir.getINode(reassignLeaseOp.path), reassignLeaseOp.path);
fsNamesys.reassignLeaseInternal(lease, fsNamesys.reassignLeaseInternal(lease,
reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile); reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
break; break;
@ -751,9 +749,8 @@ public class FSEditLogLoader {
if (oldBlock instanceof BlockInfoUnderConstruction && if (oldBlock instanceof BlockInfoUnderConstruction &&
(!isLastBlock || op.shouldCompleteLastBlock())) { (!isLastBlock || op.shouldCompleteLastBlock())) {
changeMade = true; changeMade = true;
fsNamesys.getBlockManager().forceCompleteBlock( fsNamesys.getBlockManager().forceCompleteBlock(file,
(INodeFileUnderConstruction)file, (BlockInfoUnderConstruction) oldBlock);
(BlockInfoUnderConstruction)oldBlock);
} }
if (changeMade) { if (changeMade) {
// The state or gen-stamp of the block has changed. So, we may be // The state or gen-stamp of the block has changed. So, we may be
@ -774,8 +771,7 @@ public class FSEditLogLoader {
+ path); + path);
} }
Block oldBlock = oldBlocks[oldBlocks.length - 1]; Block oldBlock = oldBlocks[oldBlocks.length - 1];
boolean removed = fsDir.unprotectedRemoveBlock(path, boolean removed = fsDir.unprotectedRemoveBlock(path, file, oldBlock);
(INodeFileUnderConstruction) file, oldBlock);
if (!removed && !(op instanceof UpdateBlocksOp)) { if (!removed && !(op instanceof UpdateBlocksOp)) {
throw new IOException("Trying to delete non-existant block " + oldBlock); throw new IOException("Trying to delete non-existant block " + oldBlock);
} }

View File

@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
@ -659,14 +658,11 @@ public class FSImageFormat {
// file // file
// read blocks // read blocks
BlockInfo[] blocks = null; BlockInfo[] blocks = new BlockInfo[numBlocks];
if (numBlocks >= 0) {
blocks = new BlockInfo[numBlocks];
for (int j = 0; j < numBlocks; j++) { for (int j = 0; j < numBlocks; j++) {
blocks[j] = new BlockInfo(replication); blocks[j] = new BlockInfo(replication);
blocks[j].readFields(in); blocks[j].readFields(in);
} }
}
String clientName = ""; String clientName = "";
String clientMachine = ""; String clientMachine = "";
@ -700,10 +696,9 @@ public class FSImageFormat {
final INodeFile file = new INodeFile(inodeId, localName, permissions, final INodeFile file = new INodeFile(inodeId, localName, permissions,
modificationTime, atime, blocks, replication, blockSize); modificationTime, atime, blocks, replication, blockSize);
if (underConstruction) { if (underConstruction) {
INodeFileUnderConstruction fileUC = new INodeFileUnderConstruction( file.toUnderConstruction(clientName, clientMachine, null);
file, clientName, clientMachine, null); return fileDiffs == null ? file : new INodeFileWithSnapshot(file,
return fileDiffs == null ? fileUC : fileDiffs);
new INodeFileUnderConstructionWithSnapshot(fileUC, fileDiffs);
} else { } else {
return fileDiffs == null ? file : return fileDiffs == null ? file :
new INodeFileWithSnapshot(file, fileDiffs); new INodeFileWithSnapshot(file, fileDiffs);
@ -829,8 +824,8 @@ public class FSImageFormat {
LOG.info("Number of files under construction = " + size); LOG.info("Number of files under construction = " + size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
INodeFileUnderConstruction cons = FSImageSerialization INodeFile cons = FSImageSerialization.readINodeUnderConstruction(in,
.readINodeUnderConstruction(in, namesystem, getLayoutVersion()); namesystem, getLayoutVersion());
counter.increment(); counter.increment();
// verify that file exists in namespace // verify that file exists in namespace
@ -849,32 +844,20 @@ public class FSImageFormat {
oldnode = INodeFile.valueOf(iip.getINode(0), path); oldnode = INodeFile.valueOf(iip.getINode(0), path);
} }
cons.setLocalName(oldnode.getLocalNameBytes()); FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
INodeReference parentRef = oldnode.getParentReference(); oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
if (parentRef != null) { uc.getClientNode());
cons.setParentReference(parentRef); if (oldnode.numBlocks() > 0) {
} else { BlockInfo ucBlock = cons.getLastBlock();
cons.setParent(oldnode.getParent()); // we do not replace the inode, just replace the last block of oldnode
} BlockInfo info = namesystem.getBlockManager().addBlockCollection(
ucBlock, oldnode);
if (oldnode instanceof INodeFileWithSnapshot) { oldnode.setBlock(oldnode.numBlocks() - 1, info);
cons = new INodeFileUnderConstructionWithSnapshot(cons,
((INodeFileWithSnapshot) oldnode).getDiffs());
} }
if (!inSnapshot) { if (!inSnapshot) {
fsDir.replaceINodeFile(path, oldnode, cons); namesystem.leaseManager.addLease(cons
namesystem.leaseManager.addLease(cons.getClientName(), path); .getFileUnderConstructionFeature().getClientName(), path);
} else {
if (parentRef != null) {
// replace oldnode with cons
parentRef.setReferredINode(cons);
} else {
// replace old node in its parent's children list and deleted list
oldnode.getParent().replaceChildFileInSnapshot(oldnode, cons);
namesystem.dir.addToInodeMap(cons);
updateBlocksMap(cons);
}
} }
} }
} }
@ -955,8 +938,8 @@ public class FSImageFormat {
private MD5Hash savedDigest; private MD5Hash savedDigest;
private final ReferenceMap referenceMap = new ReferenceMap(); private final ReferenceMap referenceMap = new ReferenceMap();
private final Map<Long, INodeFileUnderConstruction> snapshotUCMap = private final Map<Long, INodeFile> snapshotUCMap =
new HashMap<Long, INodeFileUnderConstruction>(); new HashMap<Long, INodeFile>();
/** @throws IllegalStateException if the instance has not yet saved an image */ /** @throws IllegalStateException if the instance has not yet saved an image */
private void checkSaved() { private void checkSaved() {
@ -1096,8 +1079,7 @@ public class FSImageFormat {
dirNum++; dirNum++;
} else if (inSnapshot && child.isFile() } else if (inSnapshot && child.isFile()
&& child.asFile().isUnderConstruction()) { && child.asFile().isUnderConstruction()) {
this.snapshotUCMap.put(child.getId(), this.snapshotUCMap.put(child.getId(), child.asFile());
(INodeFileUnderConstruction) child.asFile());
} }
if (i++ % 50 == 0) { if (i++ % 50 == 0) {
context.checkCancelled(); context.checkCancelled();

View File

@ -108,7 +108,7 @@ public class FSImageSerialization {
// Helper function that reads in an INodeUnderConstruction // Helper function that reads in an INodeUnderConstruction
// from the input stream // from the input stream
// //
static INodeFileUnderConstruction readINodeUnderConstruction( static INodeFile readINodeUnderConstruction(
DataInput in, FSNamesystem fsNamesys, int imgVersion) DataInput in, FSNamesystem fsNamesys, int imgVersion)
throws IOException { throws IOException {
byte[] name = readBytes(in); byte[] name = readBytes(in);
@ -141,25 +141,17 @@ public class FSImageSerialization {
int numLocs = in.readInt(); int numLocs = in.readInt();
assert numLocs == 0 : "Unexpected block locations"; assert numLocs == 0 : "Unexpected block locations";
return new INodeFileUnderConstruction(inodeId, INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
name, modificationTime, blocks, blockReplication, preferredBlockSize);
blockReplication, file.toUnderConstruction(clientName, clientMachine, null);
modificationTime, return file;
preferredBlockSize,
blocks,
perm,
clientName,
clientMachine,
null);
} }
// Helper function that writes an INodeUnderConstruction // Helper function that writes an INodeUnderConstruction
// into the input stream // into the input stream
// //
static void writeINodeUnderConstruction(DataOutputStream out, static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons,
INodeFileUnderConstruction cons, String path) throws IOException {
String path)
throws IOException {
writeString(path, out); writeString(path, out);
out.writeLong(cons.getId()); out.writeLong(cons.getId());
out.writeShort(cons.getFileReplication()); out.writeShort(cons.getFileReplication());
@ -169,8 +161,9 @@ public class FSImageSerialization {
writeBlocks(cons.getBlocks(), out); writeBlocks(cons.getBlocks(), out);
cons.getPermissionStatus().write(out); cons.getPermissionStatus().write(out);
writeString(cons.getClientName(), out); FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
writeString(cons.getClientMachine(), out); writeString(uc.getClientName(), out);
writeString(uc.getClientMachine(), out);
out.writeInt(0); // do not store locations of last block out.writeInt(0); // do not store locations of last block
} }
@ -194,9 +187,9 @@ public class FSImageSerialization {
SnapshotFSImageFormat.saveFileDiffList(file, out); SnapshotFSImageFormat.saveFileDiffList(file, out);
if (writeUnderConstruction) { if (writeUnderConstruction) {
if (file instanceof INodeFileUnderConstruction) { if (file.isUnderConstruction()) {
out.writeBoolean(true); out.writeBoolean(true);
final INodeFileUnderConstruction uc = (INodeFileUnderConstruction)file; final FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
writeString(uc.getClientName(), out); writeString(uc.getClientName(), out);
writeString(uc.getClientMachine(), out); writeString(uc.getClientMachine(), out);
} else { } else {

View File

@ -2208,13 +2208,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final DatanodeDescriptor clientNode = final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
INodeFileUnderConstruction newNode = dir.addFile(src, permissions, INodeFile newNode = dir.addFile(src, permissions, replication, blockSize,
replication, blockSize, holder, clientMachine, clientNode); holder, clientMachine, clientNode);
if (newNode == null) { if (newNode == null) {
throw new IOException("DIR* NameSystem.startFile: " + throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace."); "Unable to add file to namespace.");
} }
leaseManager.addLease(newNode.getClientName(), src); leaseManager.addLease(newNode.getFileUnderConstructionFeature()
.getClientName(), src);
// record file record in log, record new generation stamp // record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode, logRetryEntry); getEditLog().logOpenFile(src, newNode, logRetryEntry);
@ -2306,11 +2307,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache) boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
throws IOException { throws IOException {
file = file.recordModification(latestSnapshot, dir.getINodeMap()); file = file.recordModification(latestSnapshot, dir.getINodeMap());
final INodeFileUnderConstruction cons = file.toUnderConstruction( final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine,
leaseHolder, clientMachine, clientNode); clientNode);
dir.replaceINodeFile(src, file, cons); leaseManager.addLease(cons.getFileUnderConstructionFeature()
leaseManager.addLease(cons.getClientName(), src); .getClientName(), src);
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons); LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
if (writeToEditLog) { if (writeToEditLog) {
@ -2373,7 +2374,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException { throws IOException {
assert hasWriteLock(); assert hasWriteLock();
if (fileInode != null && fileInode.isUnderConstruction()) { if (fileInode != null && fileInode.isUnderConstruction()) {
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
// //
// If the file is under construction , then it must be in our // If the file is under construction , then it must be in our
// leases. Find the appropriate lease record. // leases. Find the appropriate lease record.
@ -2396,7 +2396,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// //
// Find the original holder. // Find the original holder.
// //
lease = leaseManager.getLease(pendingFile.getClientName()); FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature();
String clientName = uc.getClientName();
lease = leaseManager.getLease(clientName);
if (lease == null) { if (lease == null) {
throw new AlreadyBeingCreatedException( throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder + "failed to create file " + src + " for " + holder +
@ -2407,26 +2409,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// close now: no need to wait for soft lease expiration and // close now: no need to wait for soft lease expiration and
// close only the file src // close only the file src
LOG.info("recoverLease: " + lease + ", src=" + src + LOG.info("recoverLease: " + lease + ", src=" + src +
" from client " + pendingFile.getClientName()); " from client " + clientName);
internalReleaseLease(lease, src, holder); internalReleaseLease(lease, src, holder);
} else { } else {
assert lease.getHolder().equals(pendingFile.getClientName()) : assert lease.getHolder().equals(clientName) :
"Current lease holder " + lease.getHolder() + "Current lease holder " + lease.getHolder() +
" does not match file creator " + pendingFile.getClientName(); " does not match file creator " + clientName;
// //
// If the original holder has not renewed in the last SOFTLIMIT // If the original holder has not renewed in the last SOFTLIMIT
// period, then start lease recovery. // period, then start lease recovery.
// //
if (lease.expiredSoftLimit()) { if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover " + lease + ", src=" + src + " client " LOG.info("startFile: recover " + lease + ", src=" + src + " client "
+ pendingFile.getClientName()); + clientName);
boolean isClosed = internalReleaseLease(lease, src, null); boolean isClosed = internalReleaseLease(lease, src, null);
if(!isClosed) if(!isClosed)
throw new RecoveryInProgressException( throw new RecoveryInProgressException(
"Failed to close file " + src + "Failed to close file " + src +
". Lease recovery is in progress. Try again later."); ". Lease recovery is in progress. Try again later.");
} else { } else {
final BlockInfo lastBlock = pendingFile.getLastBlock(); final BlockInfo lastBlock = fileInode.getLastBlock();
if (lastBlock != null if (lastBlock != null
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) { && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException("Recovery in progress, file [" throw new RecoveryInProgressException("Recovery in progress, file ["
@ -2435,8 +2437,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new AlreadyBeingCreatedException("Failed to create file [" throw new AlreadyBeingCreatedException("Failed to create file ["
+ src + "] for [" + holder + "] on client [" + clientMachine + src + "] for [" + holder + "] on client [" + clientMachine
+ "], because this file is already being created by [" + "], because this file is already being created by ["
+ pendingFile.getClientName() + "] on [" + clientName + "] on ["
+ pendingFile.getClientMachine() + "]"); + uc.getClientMachine() + "]");
} }
} }
} }
@ -2566,8 +2568,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock[] onRetryBlock = new LocatedBlock[1]; LocatedBlock[] onRetryBlock = new LocatedBlock[1];
final INode[] inodes = analyzeFileState( final INode[] inodes = analyzeFileState(
src, fileId, clientName, previous, onRetryBlock).getINodes(); src, fileId, clientName, previous, onRetryBlock).getINodes();
final INodeFileUnderConstruction pendingFile = final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
(INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations. // This is a retry. Just return the last block if having locations.
@ -2580,7 +2581,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ maxBlocksPerFile); + maxBlocksPerFile);
} }
blockSize = pendingFile.getPreferredBlockSize(); blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode(); clientNode = pendingFile.getFileUnderConstructionFeature().getClientNode();
replication = pendingFile.getFileReplication(); replication = pendingFile.getFileReplication();
} finally { } finally {
readUnlock(); readUnlock();
@ -2604,8 +2605,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
INodesInPath inodesInPath = INodesInPath inodesInPath =
analyzeFileState(src, fileId, clientName, previous, onRetryBlock); analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
INode[] inodes = inodesInPath.getINodes(); INode[] inodes = inodesInPath.getINodes();
final INodeFileUnderConstruction pendingFile = final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
(INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
if (onRetryBlock[0] != null) { if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) { if (onRetryBlock[0].getLocations().length > 0) {
@ -2660,7 +2660,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Block previousBlock = ExtendedBlock.getLocalBlock(previous); Block previousBlock = ExtendedBlock.getLocalBlock(previous);
final INodesInPath iip = dir.getINodesInPath4Write(src); final INodesInPath iip = dir.getINodesInPath4Write(src);
final INodeFileUnderConstruction pendingFile final INodeFile pendingFile
= checkLease(src, fileId, clientName, iip.getLastINode()); = checkLease(src, fileId, clientName, iip.getLastINode());
BlockInfo lastBlockInFile = pendingFile.getLastBlock(); BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
@ -2766,8 +2766,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
src = FSDirectory.resolvePath(src, pathComponents, dir); src = FSDirectory.resolvePath(src, pathComponents, dir);
//check lease //check lease
final INodeFileUnderConstruction file = checkLease(src, clientName); final INodeFile file = checkLease(src, clientName);
clientnode = file.getClientNode(); clientnode = file.getFileUnderConstructionFeature().getClientNode();
preferredblocksize = file.getPreferredBlockSize(); preferredblocksize = file.getPreferredBlockSize();
//find datanode descriptors //find datanode descriptors
@ -2813,7 +2813,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// //
// Remove the block from the pending creates list // Remove the block from the pending creates list
// //
INodeFileUnderConstruction file = checkLease(src, holder); INodeFile file = checkLease(src, holder);
boolean removed = dir.removeBlock(src, file, boolean removed = dir.removeBlock(src, file,
ExtendedBlock.getLocalBlock(b)); ExtendedBlock.getLocalBlock(b));
if (!removed) { if (!removed) {
@ -2835,16 +2835,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
/** make sure that we still have the lease on this file. */ /** make sure that we still have the lease on this file. */
private INodeFileUnderConstruction checkLease(String src, String holder) private INodeFile checkLease(String src, String holder)
throws LeaseExpiredException, UnresolvedLinkException, throws LeaseExpiredException, UnresolvedLinkException,
FileNotFoundException { FileNotFoundException {
return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder, return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
dir.getINode(src)); dir.getINode(src));
} }
private INodeFileUnderConstruction checkLease(String src, long fileId, private INodeFile checkLease(String src, long fileId, String holder,
String holder, INode inode) throws LeaseExpiredException, INode inode) throws LeaseExpiredException, FileNotFoundException {
FileNotFoundException {
assert hasReadLock(); assert hasReadLock();
if (inode == null || !inode.isFile()) { if (inode == null || !inode.isFile()) {
Lease lease = leaseManager.getLease(holder); Lease lease = leaseManager.getLease(holder);
@ -2861,13 +2860,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ (lease != null ? lease.toString() + (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files.")); : "Holder " + holder + " does not have any open files."));
} }
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file; String clientName = file.getFileUnderConstructionFeature().getClientName();
if (holder != null && !pendingFile.getClientName().equals(holder)) { if (holder != null && !clientName.equals(holder)) {
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by " throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
+ pendingFile.getClientName() + " but is accessed by " + holder); + clientName + " but is accessed by " + holder);
} }
INodeId.checkId(fileId, pendingFile); INodeId.checkId(fileId, file);
return pendingFile; return file;
} }
/** /**
@ -2908,7 +2907,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
UnresolvedLinkException, IOException { UnresolvedLinkException, IOException {
assert hasWriteLock(); assert hasWriteLock();
final INodesInPath iip = dir.getLastINodeInPath(src); final INodesInPath iip = dir.getLastINodeInPath(src);
final INodeFileUnderConstruction pendingFile; final INodeFile pendingFile;
try { try {
pendingFile = checkLease(src, fileId, holder, iip.getINode(0)); pendingFile = checkLease(src, fileId, holder, iip.getINode(0));
} catch (LeaseExpiredException lee) { } catch (LeaseExpiredException lee) {
@ -3599,9 +3598,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot fsync file " + src); checkNameNodeSafeMode("Cannot fsync file " + src);
src = FSDirectory.resolvePath(src, pathComponents, dir); src = FSDirectory.resolvePath(src, pathComponents, dir);
INodeFileUnderConstruction pendingFile = checkLease(src, clientName); INodeFile pendingFile = checkLease(src, clientName);
if (lastBlockLength > 0) { if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength); pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
pendingFile, lastBlockLength);
} }
dir.persistBlocks(src, pendingFile, false); dir.persistBlocks(src, pendingFile, false);
} finally { } finally {
@ -3632,8 +3632,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
assert hasWriteLock(); assert hasWriteLock();
final INodesInPath iip = dir.getLastINodeInPath(src); final INodesInPath iip = dir.getLastINodeInPath(src);
final INodeFileUnderConstruction pendingFile final INodeFile pendingFile = iip.getINode(0).asFile();
= INodeFileUnderConstruction.valueOf(iip.getINode(0), src);
int nrBlocks = pendingFile.numBlocks(); int nrBlocks = pendingFile.numBlocks();
BlockInfo[] blocks = pendingFile.getBlocks(); BlockInfo[] blocks = pendingFile.getBlocks();
@ -3755,7 +3754,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
private Lease reassignLease(Lease lease, String src, String newHolder, private Lease reassignLease(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) { INodeFile pendingFile) {
assert hasWriteLock(); assert hasWriteLock();
if(newHolder == null) if(newHolder == null)
return lease; return lease;
@ -3765,15 +3764,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
Lease reassignLeaseInternal(Lease lease, String src, String newHolder, Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) { INodeFile pendingFile) {
assert hasWriteLock(); assert hasWriteLock();
pendingFile.setClientName(newHolder); pendingFile.getFileUnderConstructionFeature().setClientName(newHolder);
return leaseManager.reassignLease(lease, src, newHolder); return leaseManager.reassignLease(lease, src, newHolder);
} }
private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINode, private void commitOrCompleteLastBlock(final INodeFile fileINode,
final Block commitBlock) throws IOException { final Block commitBlock) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
Preconditions.checkArgument(fileINode.isUnderConstruction());
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) { if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
return; return;
} }
@ -3791,18 +3791,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
private void finalizeINodeFileUnderConstruction(String src, private void finalizeINodeFileUnderConstruction(String src,
INodeFileUnderConstruction pendingFile, Snapshot latestSnapshot) INodeFile pendingFile, Snapshot latestSnapshot) throws IOException,
throws IOException, UnresolvedLinkException { UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
leaseManager.removeLease(pendingFile.getClientName(), src); FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
leaseManager.removeLease(uc.getClientName(), src);
pendingFile = pendingFile.recordModification(latestSnapshot, pendingFile = pendingFile.recordModification(latestSnapshot,
dir.getINodeMap()); dir.getINodeMap());
// The file is no longer pending. // The file is no longer pending.
// Create permanent INode, update blocks // Create permanent INode, update blocks. No need to replace the inode here
final INodeFile newFile = pendingFile.toINodeFile(now()); // since we just remove the uc feature from pendingFile
dir.replaceINodeFile(src, pendingFile, newFile); final INodeFile newFile = pendingFile.toCompleteFile(now());
// close file and persist block allocations for this file // close file and persist block allocations for this file
dir.closeFile(src, newFile); dir.closeFile(src, newFile);
@ -3819,12 +3821,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) { public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
assert hasReadLock(); assert hasReadLock();
final BlockCollection bc = blockUC.getBlockCollection(); final BlockCollection bc = blockUC.getBlockCollection();
if (bc == null || !(bc instanceof INodeFileUnderConstruction)) { if (bc == null || !(bc instanceof INodeFile)
|| !((INodeFile) bc).isUnderConstruction()) {
return false; return false;
} }
INodeFileUnderConstruction inodeUC = (INodeFileUnderConstruction) blockUC INodeFile inodeUC = (INodeFile) bc;
.getBlockCollection();
String fullName = inodeUC.getName(); String fullName = inodeUC.getName();
try { try {
if (fullName != null && fullName.startsWith(Path.SEPARATOR) if (fullName != null && fullName.startsWith(Path.SEPARATOR)
@ -3902,11 +3904,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ recoveryId + " for block " + lastblock); + recoveryId + " for block " + lastblock);
} }
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
if (deleteblock) { if (deleteblock) {
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock); Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
boolean remove = pendingFile.removeLastBlock(blockToDel); boolean remove = iFile.removeLastBlock(blockToDel);
if (remove) { if (remove) {
blockManager.removeBlockFromMap(storedBlock); blockManager.removeBlockFromMap(storedBlock);
} }
@ -3944,14 +3944,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// add pipeline locations into the INodeUnderConstruction // add pipeline locations into the INodeUnderConstruction
DatanodeDescriptor[] targetArray = DatanodeDescriptor[] targetArray =
new DatanodeDescriptor[targetList.size()]; new DatanodeDescriptor[targetList.size()];
pendingFile.setLastBlock(storedBlock, targetList.toArray(targetArray)); iFile.setLastBlock(storedBlock, targetList.toArray(targetArray));
} }
if (closeFile) { if (closeFile) {
src = closeFileCommitBlocks(pendingFile, storedBlock); src = closeFileCommitBlocks(iFile, storedBlock);
} else { } else {
// If this commit does not want to close the file, persist blocks // If this commit does not want to close the file, persist blocks
src = persistBlocks(pendingFile, false); src = persistBlocks(iFile, false);
} }
} finally { } finally {
writeUnlock(); writeUnlock();
@ -3976,10 +3976,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException * @throws IOException
*/ */
@VisibleForTesting @VisibleForTesting
String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile, String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
BlockInfo storedBlock)
throws IOException { throws IOException {
String src = leaseManager.findPath(pendingFile); String src = leaseManager.findPath(pendingFile);
// commit the last block and complete it if it has minimum replicas // commit the last block and complete it if it has minimum replicas
@ -4000,8 +3998,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException * @throws IOException
*/ */
@VisibleForTesting @VisibleForTesting
String persistBlocks(INodeFileUnderConstruction pendingFile, String persistBlocks(INodeFile pendingFile, boolean logRetryCache)
boolean logRetryCache) throws IOException { throws IOException {
String src = leaseManager.findPath(pendingFile); String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile, logRetryCache); dir.persistBlocks(src, pendingFile, logRetryCache);
return src; return src;
@ -5186,13 +5184,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try { try {
for (Lease lease : leaseManager.getSortedLeases()) { for (Lease lease : leaseManager.getSortedLeases()) {
for (String path : lease.getPaths()) { for (String path : lease.getPaths()) {
final INodeFileUnderConstruction cons; final INodeFile cons;
try { try {
cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path); cons = dir.getINode(path).asFile();
Preconditions.checkState(cons.isUnderConstruction());
} catch (UnresolvedLinkException e) { } catch (UnresolvedLinkException e) {
throw new AssertionError("Lease files should reside on this FS"); throw new AssertionError("Lease files should reside on this FS");
} catch (IOException e) {
throw new RuntimeException(e);
} }
BlockInfo[] blocks = cons.getBlocks(); BlockInfo[] blocks = cons.getBlocks();
if(blocks == null) if(blocks == null)
@ -5768,7 +5765,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return blockId; return blockId;
} }
private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block, private INodeFile checkUCBlock(ExtendedBlock block,
String clientName) throws IOException { String clientName) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
checkNameNodeSafeMode("Cannot get a new generation stamp and an " checkNameNodeSafeMode("Cannot get a new generation stamp and an "
@ -5784,19 +5781,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// check file inode // check file inode
final INodeFile file = ((INode)storedBlock.getBlockCollection()).asFile(); final INodeFile file = ((INode)storedBlock.getBlockCollection()).asFile();
if (file==null || !file.isUnderConstruction()) { if (file == null || !file.isUnderConstruction()) {
throw new IOException("The file " + storedBlock + throw new IOException("The file " + storedBlock +
" belonged to does not exist or it is not under construction."); " belonged to does not exist or it is not under construction.");
} }
// check lease // check lease
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file; if (clientName == null
if (clientName == null || !clientName.equals(pendingFile.getClientName())) { || !clientName.equals(file.getFileUnderConstructionFeature()
.getClientName())) {
throw new LeaseExpiredException("Lease mismatch: " + block + throw new LeaseExpiredException("Lease mismatch: " + block +
" is accessed by a non lease holder " + clientName); " is accessed by a non lease holder " + clientName);
} }
return pendingFile; return file;
} }
/** /**
@ -5905,8 +5903,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException { throws IOException {
assert hasWriteLock(); assert hasWriteLock();
// check the vadility of the block and lease holder name // check the vadility of the block and lease holder name
final INodeFileUnderConstruction pendingFile final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
= checkUCBlock(oldBlock, clientName);
final BlockInfoUnderConstruction blockinfo final BlockInfoUnderConstruction blockinfo
= (BlockInfoUnderConstruction)pendingFile.getLastBlock(); = (BlockInfoUnderConstruction)pendingFile.getLastBlock();
@ -5950,15 +5947,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Serializes leases. * Serializes leases.
*/ */
void saveFilesUnderConstruction(DataOutputStream out, void saveFilesUnderConstruction(DataOutputStream out,
Map<Long, INodeFileUnderConstruction> snapshotUCMap) throws IOException { Map<Long, INodeFile> snapshotUCMap) throws IOException {
// This is run by an inferior thread of saveNamespace, which holds a read // This is run by an inferior thread of saveNamespace, which holds a read
// lock on our behalf. If we took the read lock here, we could block // lock on our behalf. If we took the read lock here, we could block
// for fairness if a writer is waiting on the lock. // for fairness if a writer is waiting on the lock.
synchronized (leaseManager) { synchronized (leaseManager) {
Map<String, INodeFileUnderConstruction> nodes = Map<String, INodeFile> nodes = leaseManager.getINodesUnderConstruction();
leaseManager.getINodesUnderConstruction(); for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
for (Map.Entry<String, INodeFileUnderConstruction> entry
: nodes.entrySet()) {
// TODO: for HDFS-5428, because of rename operations, some // TODO: for HDFS-5428, because of rename operations, some
// under-construction files that are // under-construction files that are
// in the current fs directory can also be captured in the // in the current fs directory can also be captured in the
@ -5967,13 +5962,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size
for (Map.Entry<String, INodeFileUnderConstruction> entry for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
: nodes.entrySet()) {
FSImageSerialization.writeINodeUnderConstruction( FSImageSerialization.writeINodeUnderConstruction(
out, entry.getValue(), entry.getKey()); out, entry.getValue(), entry.getKey());
} }
for (Map.Entry<Long, INodeFileUnderConstruction> entry for (Map.Entry<Long, INodeFile> entry : snapshotUCMap.entrySet()) {
: snapshotUCMap.entrySet()) {
// for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>" // for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
// as their paths // as their paths
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
/**
* I-node for file being written.
*/
@InterfaceAudience.Private
public class FileUnderConstructionFeature extends INodeFile.Feature {
private String clientName; // lease holder
private final String clientMachine;
// if client is a cluster node too.
private final DatanodeDescriptor clientNode;
public FileUnderConstructionFeature(final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode) {
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;
}
public String getClientName() {
return clientName;
}
void setClientName(String clientName) {
this.clientName = clientName;
}
public String getClientMachine() {
return clientMachine;
}
public DatanodeDescriptor getClientNode() {
return clientNode;
}
/**
* Update the length for the last block
*
* @param lastBlockLength
* The length of the last block reported from client
* @throws IOException
*/
void updateLengthOfLastBlock(INodeFile f, long lastBlockLength)
throws IOException {
BlockInfo lastBlock = f.getLastBlock();
assert (lastBlock != null) : "The last block for path "
+ f.getFullPathName() + " is null when updating its length";
assert (lastBlock instanceof BlockInfoUnderConstruction)
: "The last block for path " + f.getFullPathName()
+ " is not a BlockInfoUnderConstruction when updating its length";
lastBlock.setNumBytes(lastBlockLength);
}
/**
* When deleting a file in the current fs directory, and the file is contained
* in a snapshot, we should delete the last block if it's under construction
* and its size is 0.
*/
void cleanZeroSizeBlock(final INodeFile f,
final BlocksMapUpdateInfo collectedBlocks) {
final BlockInfo[] blocks = f.getBlocks();
if (blocks != null && blocks.length > 0
&& blocks[blocks.length - 1] instanceof BlockInfoUnderConstruction) {
BlockInfoUnderConstruction lastUC =
(BlockInfoUnderConstruction) blocks[blocks.length - 1];
if (lastUC.getNumBytes() == 0) {
// this is a 0-sized block. do not need check its UC state here
collectedBlocks.addDeleteBlock(lastUC);
f.removeLastBlock(lastUC);
}
}
}
}

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.hdfs.util.ReadOnlyList;
@ -205,23 +204,6 @@ public class INodeDirectory extends INodeWithAdditionalFields
return newDir; return newDir;
} }
/**
* Used when load fileUC from fsimage. The file to be replaced is actually
* only in snapshot, thus may not be contained in the children list.
* See HDFS-5428 for details.
*/
public void replaceChildFileInSnapshot(INodeFile oldChild,
final INodeFile newChild) {
if (children != null) {
final int i = searchChildren(newChild.getLocalNameBytes());
if (i >= 0 && children.get(i).getId() == oldChild.getId()) {
// no need to consider reference node here, since we already do the
// replacement in FSImageFormat.Loader#loadFilesUnderConstruction
children.set(i, newChild);
}
}
}
/** Replace the given child with a new child. */ /** Replace the given child with a new child. */
public void replaceChild(INode oldChild, final INode newChild, public void replaceChild(INode oldChild, final INode newChild,
final INodeMap inodeMap) { final INodeMap inodeMap) {
@ -291,17 +273,6 @@ public class INodeDirectory extends INodeWithAdditionalFields
return newChild; return newChild;
} }
/** Replace a child {@link INodeFile} with an {@link INodeFileUnderConstructionWithSnapshot}. */
INodeFileUnderConstructionWithSnapshot replaceChild4INodeFileUcWithSnapshot(
final INodeFileUnderConstruction child, final INodeMap inodeMap) {
Preconditions.checkArgument(!(child instanceof INodeFileUnderConstructionWithSnapshot),
"Child file is already an INodeFileUnderConstructionWithSnapshot, child=" + child);
final INodeFileUnderConstructionWithSnapshot newChild
= new INodeFileUnderConstructionWithSnapshot(child, null);
replaceChildFile(child, newChild, inodeMap);
return newChild;
}
@Override @Override
public INodeDirectory recordModification(Snapshot latest, public INodeDirectory recordModification(Snapshot latest,
final INodeMap inodeMap) throws QuotaExceededException { final INodeMap inodeMap) throws QuotaExceededException {

View File

@ -20,15 +20,18 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
@ -43,6 +46,22 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class INodeFile extends INodeWithAdditionalFields public class INodeFile extends INodeWithAdditionalFields
implements INodeFileAttributes, BlockCollection { implements INodeFileAttributes, BlockCollection {
/**
* A feature contains specific information for a type of INodeFile. E.g.,
* we can have separate features for Under-Construction and Snapshot.
*/
public static abstract class Feature {
private Feature nextFeature;
public Feature getNextFeature() {
return nextFeature;
}
public void setNextFeature(Feature next) {
this.nextFeature = next;
}
}
/** The same as valueOf(inode, path, false). */ /** The same as valueOf(inode, path, false). */
public static INodeFile valueOf(INode inode, String path public static INodeFile valueOf(INode inode, String path
) throws FileNotFoundException { ) throws FileNotFoundException {
@ -104,8 +123,11 @@ public class INodeFile extends INodeWithAdditionalFields
private BlockInfo[] blocks; private BlockInfo[] blocks;
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime, private Feature headFeature;
BlockInfo[] blklist, short replication, long preferredBlockSize) {
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime, BlockInfo[] blklist, short replication,
long preferredBlockSize) {
super(id, name, permissions, mtime, atime); super(id, name, permissions, mtime, atime);
header = HeaderFormat.combineReplication(header, replication); header = HeaderFormat.combineReplication(header, replication);
header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize); header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
@ -116,6 +138,48 @@ public class INodeFile extends INodeWithAdditionalFields
super(that); super(that);
this.header = that.header; this.header = that.header;
this.blocks = that.blocks; this.blocks = that.blocks;
this.headFeature = that.headFeature;
}
/**
* If the inode contains a {@link FileUnderConstructionFeature}, return it;
* otherwise, return null.
*/
public final FileUnderConstructionFeature getFileUnderConstructionFeature() {
for (Feature f = this.headFeature; f != null; f = f.nextFeature) {
if (f instanceof FileUnderConstructionFeature) {
return (FileUnderConstructionFeature) f;
}
}
return null;
}
/** Is this file under construction? */
@Override // BlockCollection
public boolean isUnderConstruction() {
return getFileUnderConstructionFeature() != null;
}
void addFeature(Feature f) {
f.nextFeature = headFeature;
headFeature = f;
}
void removeFeature(Feature f) {
if (f == headFeature) {
headFeature = headFeature.nextFeature;
return;
} else if (headFeature != null) {
Feature prev = headFeature;
Feature curr = headFeature.nextFeature;
for (; curr != null && curr != f; prev = curr, curr = curr.nextFeature)
;
if (curr != null) {
prev.nextFeature = curr.nextFeature;
return;
}
}
throw new IllegalStateException("Feature " + f + " not found.");
} }
/** @return true unconditionally. */ /** @return true unconditionally. */
@ -130,22 +194,88 @@ public class INodeFile extends INodeWithAdditionalFields
return this; return this;
} }
/** Is this file under construction? */ /* Start of Under-Construction Feature */
public boolean isUnderConstruction() {
return false;
}
/** Convert this file to an {@link INodeFileUnderConstruction}. */ /** Convert this file to an {@link INodeFileUnderConstruction}. */
public INodeFileUnderConstruction toUnderConstruction( public INodeFile toUnderConstruction(String clientName, String clientMachine,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) { DatanodeDescriptor clientNode) {
Preconditions.checkState(!isUnderConstruction(), Preconditions.checkState(!isUnderConstruction(),
"file is already an INodeFileUnderConstruction"); "file is already an INodeFileUnderConstruction");
return new INodeFileUnderConstruction(this, FileUnderConstructionFeature uc = new FileUnderConstructionFeature(
clientName, clientMachine, clientNode); clientName, clientMachine, clientNode);
addFeature(uc);
return this;
} }
/**
* Convert the file to a complete file, i.e., to remove the Under-Construction
* feature.
*/
public INodeFile toCompleteFile(long mtime) {
FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
if (uc != null) {
assertAllBlocksComplete();
removeFeature(uc);
this.setModificationTime(mtime);
}
return this;
}
/** Assert all blocks are complete. */
private void assertAllBlocksComplete() {
if (blocks == null) {
return;
}
for (int i = 0; i < blocks.length; i++) {
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
getClass().getSimpleName(), this, i, Arrays.asList(blocks));
}
}
@Override //BlockCollection
public void setBlock(int index, BlockInfo blk) {
this.blocks[index] = blk;
}
@Override // BlockCollection
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] locations) throws IOException {
Preconditions.checkState(isUnderConstruction());
if (numBlocks() == 0) {
throw new IOException("Failed to set last block: File is empty.");
}
BlockInfoUnderConstruction ucBlock =
lastBlock.convertToBlockUnderConstruction(
BlockUCState.UNDER_CONSTRUCTION, locations);
ucBlock.setBlockCollection(this);
setBlock(numBlocks() - 1, ucBlock);
return ucBlock;
}
/**
* Remove a block from the block list. This block should be
* the last one on the list.
*/
boolean removeLastBlock(Block oldblock) {
if (blocks == null || blocks.length == 0) {
return false;
}
int size_1 = blocks.length - 1;
if (!blocks[size_1].equals(oldblock)) {
return false;
}
//copy to a new list
BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1);
setBlocks(newlist);
return true;
}
/* End of Under-Construction Feature */
@Override @Override
public INodeFileAttributes getSnapshotINode(final Snapshot snapshot) { public INodeFileAttributes getSnapshotINode(final Snapshot snapshot) {
return this; return this;
@ -266,11 +396,6 @@ public class INodeFile extends INodeWithAdditionalFields
} }
} }
/** Set the block of the file at the given index. */
public void setBlock(int idx, BlockInfo blk) {
this.blocks[idx] = blk;
}
/** Set the blocks. */ /** Set the blocks. */
public void setBlocks(BlockInfo[] blocks) { public void setBlocks(BlockInfo[] blocks) {
this.blocks = blocks; this.blocks = blocks;
@ -286,6 +411,11 @@ public class INodeFile extends INodeWithAdditionalFields
// this only happens when deleting the current file // this only happens when deleting the current file
computeQuotaUsage(counts, false); computeQuotaUsage(counts, false);
destroyAndCollectBlocks(collectedBlocks, removedINodes); destroyAndCollectBlocks(collectedBlocks, removedINodes);
} else if (snapshot == null && prior != null) {
FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
if (uc != null) {
uc.cleanZeroSizeBlock(this, collectedBlocks);
}
} }
return counts; return counts;
} }

View File

@ -1,247 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import com.google.common.base.Preconditions;
/**
* I-node for file being written.
*/
@InterfaceAudience.Private
public class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
/** Cast INode to INodeFileUnderConstruction. */
public static INodeFileUnderConstruction valueOf(INode inode, String path
) throws FileNotFoundException {
final INodeFile file = INodeFile.valueOf(inode, path);
if (!file.isUnderConstruction()) {
throw new FileNotFoundException("File is not under construction: " + path);
}
return (INodeFileUnderConstruction)file;
}
private String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
INodeFileUnderConstruction(long id,
PermissionStatus permissions,
short replication,
long preferredBlockSize,
long modTime,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
this(id, null, replication, modTime, preferredBlockSize, BlockInfo.EMPTY_ARRAY,
permissions, clientName, clientMachine, clientNode);
}
INodeFileUnderConstruction(long id,
byte[] name,
short blockReplication,
long modificationTime,
long preferredBlockSize,
BlockInfo[] blocks,
PermissionStatus perm,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
super(id, name, perm, modificationTime, modificationTime,
blocks, blockReplication, preferredBlockSize);
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;
}
public INodeFileUnderConstruction(final INodeFile that,
final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode) {
super(that);
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;
}
public String getClientName() {
return clientName;
}
void setClientName(String clientName) {
this.clientName = clientName;
}
public String getClientMachine() {
return clientMachine;
}
public DatanodeDescriptor getClientNode() {
return clientNode;
}
/** @return true unconditionally. */
@Override
public final boolean isUnderConstruction() {
return true;
}
/**
* Converts an INodeFileUnderConstruction to an INodeFile.
* The original modification time is used as the access time.
* The new modification is the specified mtime.
*/
protected INodeFile toINodeFile(long mtime) {
assertAllBlocksComplete();
final INodeFile f = new INodeFile(getId(), getLocalNameBytes(),
getPermissionStatus(), mtime, getModificationTime(),
getBlocks(), getFileReplication(), getPreferredBlockSize());
f.setParent(getParent());
return f;
}
@Override
public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
final BlocksMapUpdateInfo collectedBlocks,
final List<INode> removedINodes, final boolean countDiffChange)
throws QuotaExceededException {
if (snapshot == null && prior != null) {
cleanZeroSizeBlock(collectedBlocks);
return Counts.newInstance();
} else {
return super.cleanSubtree(snapshot, prior, collectedBlocks,
removedINodes, countDiffChange);
}
}
/**
* When deleting a file in the current fs directory, and the file is contained
* in a snapshot, we should delete the last block if it's under construction
* and its size is 0.
*/
private void cleanZeroSizeBlock(final BlocksMapUpdateInfo collectedBlocks) {
final BlockInfo[] blocks = getBlocks();
if (blocks != null && blocks.length > 0
&& blocks[blocks.length - 1] instanceof BlockInfoUnderConstruction) {
BlockInfoUnderConstruction lastUC =
(BlockInfoUnderConstruction) blocks[blocks.length - 1];
if (lastUC.getNumBytes() == 0) {
// this is a 0-sized block. do not need check its UC state here
collectedBlocks.addDeleteBlock(lastUC);
removeLastBlock(lastUC);
}
}
}
@Override
public INodeFileUnderConstruction recordModification(final Snapshot latest,
final INodeMap inodeMap) throws QuotaExceededException {
if (isInLatestSnapshot(latest)) {
INodeFileUnderConstructionWithSnapshot newFile = getParent()
.replaceChild4INodeFileUcWithSnapshot(this, inodeMap)
.recordModification(latest, inodeMap);
return newFile;
} else {
return this;
}
}
/** Assert all blocks are complete. */
protected void assertAllBlocksComplete() {
final BlockInfo[] blocks = getBlocks();
for (int i = 0; i < blocks.length; i++) {
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
getClass().getSimpleName(), this, i, Arrays.asList(getBlocks()));
}
}
/**
* Remove a block from the block list. This block should be
* the last one on the list.
*/
boolean removeLastBlock(Block oldblock) {
final BlockInfo[] blocks = getBlocks();
if (blocks == null || blocks.length == 0) {
return false;
}
int size_1 = blocks.length - 1;
if (!blocks[size_1].equals(oldblock)) {
return false;
}
//copy to a new list
BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1);
setBlocks(newlist);
return true;
}
/**
* Convert the last block of the file to an under-construction block.
* Set its locations.
*/
@Override
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] targets) throws IOException {
if (numBlocks() == 0) {
throw new IOException("Failed to set last block: File is empty.");
}
BlockInfoUnderConstruction ucBlock =
lastBlock.convertToBlockUnderConstruction(
BlockUCState.UNDER_CONSTRUCTION, targets);
ucBlock.setBlockCollection(this);
setBlock(numBlocks()-1, ucBlock);
return ucBlock;
}
/**
* Update the length for the last block
*
* @param lastBlockLength
* The length of the last block reported from client
* @throws IOException
*/
void updateLengthOfLastBlock(long lastBlockLength) throws IOException {
BlockInfo lastBlock = this.getLastBlock();
assert (lastBlock != null) : "The last block for path "
+ this.getFullPathName() + " is null when updating its length";
assert (lastBlock instanceof BlockInfoUnderConstruction) : "The last block for path "
+ this.getFullPathName()
+ " is not a BlockInfoUnderConstruction when updating its length";
lastBlock.setNumBytes(lastBlockLength);
}
}

View File

@ -182,9 +182,11 @@ public class LeaseManager {
/** /**
* Finds the pathname for the specified pendingFile * Finds the pathname for the specified pendingFile
*/ */
public synchronized String findPath(INodeFileUnderConstruction pendingFile) public synchronized String findPath(INodeFile pendingFile)
throws IOException { throws IOException {
Lease lease = getLease(pendingFile.getClientName()); FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
Lease lease = getLease(uc.getClientName());
if (lease != null) { if (lease != null) {
String src = lease.findPath(pendingFile); String src = lease.findPath(pendingFile);
if (src != null) { if (src != null) {
@ -253,7 +255,7 @@ public class LeaseManager {
/** /**
* @return the path associated with the pendingFile and null if not found. * @return the path associated with the pendingFile and null if not found.
*/ */
private String findPath(INodeFileUnderConstruction pendingFile) { private String findPath(INodeFile pendingFile) {
try { try {
for (String src : paths) { for (String src : paths) {
INode node = fsnamesystem.dir.getINode(src); INode node = fsnamesystem.dir.getINode(src);
@ -433,14 +435,14 @@ public class LeaseManager {
* @return list of inodes * @return list of inodes
* @throws UnresolvedLinkException * @throws UnresolvedLinkException
*/ */
Map<String, INodeFileUnderConstruction> getINodesUnderConstruction() { Map<String, INodeFile> getINodesUnderConstruction() {
Map<String, INodeFileUnderConstruction> inodes = Map<String, INodeFile> inodes = new TreeMap<String, INodeFile>();
new TreeMap<String, INodeFileUnderConstruction>();
for (String p : sortedLeasesByPath.keySet()) { for (String p : sortedLeasesByPath.keySet()) {
// verify that path exists in namespace // verify that path exists in namespace
try { try {
INode node = fsnamesystem.dir.getINode(p); INodeFile node = INodeFile.valueOf(fsnamesystem.dir.getINode(p), p);
inodes.put(p, INodeFileUnderConstruction.valueOf(node, p)); Preconditions.checkState(node.isUnderConstruction());
inodes.put(p, node);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error(ioe); LOG.error(ioe);
} }

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes; import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota; import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeMap; import org.apache.hadoop.hdfs.server.namenode.INodeMap;
import org.apache.hadoop.hdfs.server.namenode.INodeReference; import org.apache.hadoop.hdfs.server.namenode.INodeReference;
import org.apache.hadoop.hdfs.server.namenode.Quota; import org.apache.hadoop.hdfs.server.namenode.Quota;
@ -593,14 +592,6 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
return removed; return removed;
} }
@Override
public void replaceChildFileInSnapshot(final INodeFile oldChild,
final INodeFile newChild) {
super.replaceChildFileInSnapshot(oldChild, newChild);
diffs.replaceChild(ListType.DELETED, oldChild, newChild);
diffs.replaceChild(ListType.CREATED, oldChild, newChild);
}
@Override @Override
public void replaceChild(final INode oldChild, final INode newChild, public void replaceChild(final INode oldChild, final INode newChild,
final INodeMap inodeMap) { final INodeMap inodeMap) {

View File

@ -1,130 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.INodeMap;
import org.apache.hadoop.hdfs.server.namenode.Quota;
/**
* Represent an {@link INodeFileUnderConstruction} that is snapshotted.
*/
@InterfaceAudience.Private
public class INodeFileUnderConstructionWithSnapshot
extends INodeFileUnderConstruction implements FileWithSnapshot {
private final FileDiffList diffs;
private boolean isCurrentFileDeleted = false;
INodeFileUnderConstructionWithSnapshot(final INodeFile f,
final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode,
final FileDiffList diffs) {
super(f, clientName, clientMachine, clientNode);
this.diffs = diffs != null? diffs: new FileDiffList();
}
/**
* Construct an {@link INodeFileUnderConstructionWithSnapshot} based on an
* {@link INodeFileUnderConstruction}.
*
* @param f The given {@link INodeFileUnderConstruction} instance
*/
public INodeFileUnderConstructionWithSnapshot(INodeFileUnderConstruction f,
final FileDiffList diffs) {
this(f, f.getClientName(), f.getClientMachine(), f.getClientNode(), diffs);
}
@Override
protected INodeFileWithSnapshot toINodeFile(final long mtime) {
assertAllBlocksComplete();
final long atime = getModificationTime();
final INodeFileWithSnapshot f = new INodeFileWithSnapshot(this, getDiffs());
f.setModificationTime(mtime);
f.setAccessTime(atime);
return f;
}
@Override
public boolean isCurrentFileDeleted() {
return isCurrentFileDeleted;
}
@Override
public void deleteCurrentFile() {
isCurrentFileDeleted = true;
}
@Override
public INodeFileAttributes getSnapshotINode(Snapshot snapshot) {
return diffs.getSnapshotINode(snapshot, this);
}
@Override
public INodeFileUnderConstructionWithSnapshot recordModification(
final Snapshot latest, final INodeMap inodeMap)
throws QuotaExceededException {
if (isInLatestSnapshot(latest) && !shouldRecordInSrcSnapshot(latest)) {
diffs.saveSelf2Snapshot(latest, this, null);
}
return this;
}
@Override
public INodeFile asINodeFile() {
return this;
}
@Override
public FileDiffList getDiffs() {
return diffs;
}
@Override
public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
final BlocksMapUpdateInfo collectedBlocks,
final List<INode> removedINodes, final boolean countDiffChange)
throws QuotaExceededException {
if (snapshot == null) { // delete the current file
if (!isCurrentFileDeleted()) {
recordModification(prior, null);
deleteCurrentFile();
}
Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
return Quota.Counts.newInstance();
} else { // delete a snapshot
prior = getDiffs().updatePrior(snapshot, prior);
return diffs.deleteSnapshotDiff(snapshot, prior, this, collectedBlocks,
removedINodes, countDiffChange);
}
}
@Override
public String toDetailString() {
return super.toDetailString()
+ (isCurrentFileDeleted()? " (DELETED), ": ", ") + diffs;
}
}

View File

@ -21,7 +21,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes; import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
@ -47,15 +46,6 @@ public class INodeFileWithSnapshot extends INodeFile
this.diffs = diffs != null? diffs: new FileDiffList(); this.diffs = diffs != null? diffs: new FileDiffList();
} }
@Override
public INodeFileUnderConstructionWithSnapshot toUnderConstruction(
final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode) {
return new INodeFileUnderConstructionWithSnapshot(this,
clientName, clientMachine, clientNode, getDiffs());
}
@Override @Override
public boolean isCurrentFileDeleted() { public boolean isCurrentFileDeleted() {
return isCurrentFileDeleted; return isCurrentFileDeleted;

View File

@ -1113,10 +1113,11 @@ public class TestReplicationPolicy {
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
final BlockInfo info = new BlockInfo(block1, 1); final BlockInfo info = new BlockInfo(block1, 1);
final MutableBlockCollection mbc = mock(MutableBlockCollection.class); final BlockCollection mbc = mock(BlockCollection.class);
when(mbc.getLastBlock()).thenReturn(info); when(mbc.getLastBlock()).thenReturn(info);
when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1); when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
when(mbc.getBlockReplication()).thenReturn((short)1); when(mbc.getBlockReplication()).thenReturn((short)1);
when(mbc.isUnderConstruction()).thenReturn(true);
ContentSummary cs = mock(ContentSummary.class); ContentSummary cs = mock(ContentSummary.class);
when(cs.getLength()).thenReturn((long)1); when(cs.getLength()).thenReturn((long)1);
when(mbc.computeContentSummary()).thenReturn(cs); when(mbc.computeContentSummary()).thenReturn(cs);

View File

@ -82,9 +82,10 @@ public class CreateEditsLog {
blocks[iB].setBlockId(currentBlockId++); blocks[iB].setBlockId(currentBlockId++);
} }
INodeFileUnderConstruction inode = new INodeFileUnderConstruction( final INodeFile inode = new INodeFile(inodeId.nextValue(), null,
inodeId.nextValue(), null, replication, 0, blockSize, blocks, p, "", p, 0L, 0L, blocks, replication, blockSize);
"", null); inode.toUnderConstruction("", "", null);
// Append path to filename with information about blockIDs // Append path to filename with information about blockIDs
String path = "_" + iF + "_B" + blocks[0].getBlockId() + String path = "_" + iF + "_B" + blocks[0].getBlockId() +
"_to_B" + blocks[blocksPerFile-1].getBlockId() + "_"; "_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
@ -96,9 +97,10 @@ public class CreateEditsLog {
dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L); dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L);
editLog.logMkDir(currentDir, dirInode); editLog.logMkDir(currentDir, dirInode);
} }
editLog.logOpenFile(filePath, INodeFile fileUc = new INodeFile(inodeId.nextValue(), null,
new INodeFileUnderConstruction(inodeId.nextValue(), p, replication, p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
0, blockSize, "", "", null), false); fileUc.toUnderConstruction("", "", null);
editLog.logOpenFile(filePath, fileUc, false);
editLog.logCloseFile(filePath, inode); editLog.logCloseFile(filePath, inode);
if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks

View File

@ -43,8 +43,7 @@ public class TestCommitBlockSynchronization {
private static final long length = 200; private static final long length = 200;
private static final long genStamp = 300; private static final long genStamp = 300;
private FSNamesystem makeNameSystemSpy(Block block, private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
INodeFileUnderConstruction file)
throws IOException { throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
FSImage image = new FSImage(conf); FSImage image = new FSImage(conf);
@ -58,21 +57,26 @@ public class TestCommitBlockSynchronization {
blockInfo.setGenerationStamp(genStamp); blockInfo.setGenerationStamp(genStamp);
blockInfo.initializeBlockRecovery(genStamp); blockInfo.initializeBlockRecovery(genStamp);
doReturn(true).when(file).removeLastBlock(any(Block.class)); doReturn(true).when(file).removeLastBlock(any(Block.class));
doReturn(true).when(file).isUnderConstruction();
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class)); doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
doReturn("").when(namesystemSpy).closeFileCommitBlocks( doReturn("").when(namesystemSpy).closeFileCommitBlocks(
any(INodeFileUnderConstruction.class), any(INodeFile.class), any(BlockInfo.class));
any(BlockInfo.class));
doReturn("").when(namesystemSpy).persistBlocks( doReturn("").when(namesystemSpy).persistBlocks(
any(INodeFileUnderConstruction.class), anyBoolean()); any(INodeFile.class), anyBoolean());
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog(); doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
return namesystemSpy; return namesystemSpy;
} }
private INodeFile mockFileUnderConstruction() {
INodeFile file = mock(INodeFile.class);
return file;
}
@Test @Test
public void testCommitBlockSynchronization() throws IOException { public void testCommitBlockSynchronization() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp); Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0]; DatanodeID[] newTargets = new DatanodeID[0];
@ -100,7 +104,7 @@ public class TestCommitBlockSynchronization {
@Test @Test
public void testCommitBlockSynchronization2() throws IOException { public void testCommitBlockSynchronization2() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp); Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0]; DatanodeID[] newTargets = new DatanodeID[0];
@ -124,7 +128,7 @@ public class TestCommitBlockSynchronization {
@Test @Test
public void testCommitBlockSynchronizationWithDelete() throws IOException { public void testCommitBlockSynchronizationWithDelete() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp); Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0]; DatanodeID[] newTargets = new DatanodeID[0];
@ -144,7 +148,7 @@ public class TestCommitBlockSynchronization {
@Test @Test
public void testCommitBlockSynchronizationWithClose() throws IOException { public void testCommitBlockSynchronizationWithClose() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp); Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0]; DatanodeID[] newTargets = new DatanodeID[0];
@ -171,7 +175,7 @@ public class TestCommitBlockSynchronization {
@Test @Test
public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget() public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget()
throws IOException { throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp); Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[]{ DatanodeID[] newTargets = new DatanodeID[]{

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@ -152,9 +153,10 @@ public class TestEditLog {
FSEditLog editLog = namesystem.getEditLog(); FSEditLog editLog = namesystem.getEditLog();
for (int i = 0; i < numTransactions; i++) { for (int i = 0; i < numTransactions; i++) {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction( INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
namesystem.allocateNewInodeId(), p, replication, blockSize, 0, "", p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
"", null); inode.toUnderConstruction("", "", null);
editLog.logOpenFile("/filename" + (startIndex + i), inode, false); editLog.logOpenFile("/filename" + (startIndex + i), inode, false);
editLog.logCloseFile("/filename" + (startIndex + i), inode); editLog.logCloseFile("/filename" + (startIndex + i), inode);
editLog.logSync(); editLog.logSync();

View File

@ -29,6 +29,8 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.util.Time;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -282,14 +285,6 @@ public class TestINodeFile {
assertTrue(fnfe.getMessage().contains("File does not exist")); assertTrue(fnfe.getMessage().contains("File does not exist"));
} }
//cast to INodeFileUnderConstruction, should fail
try {
INodeFileUnderConstruction.valueOf(from, path);
fail();
} catch(FileNotFoundException fnfe) {
assertTrue(fnfe.getMessage().contains("File does not exist"));
}
//cast to INodeDirectory, should fail //cast to INodeDirectory, should fail
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
@ -306,14 +301,6 @@ public class TestINodeFile {
final INodeFile f = INodeFile.valueOf(from, path); final INodeFile f = INodeFile.valueOf(from, path);
assertTrue(f == from); assertTrue(f == from);
//cast to INodeFileUnderConstruction, should fail
try {
INodeFileUnderConstruction.valueOf(from, path);
fail();
} catch(IOException ioe) {
assertTrue(ioe.getMessage().contains("File is not under construction"));
}
//cast to INodeDirectory, should fail //cast to INodeDirectory, should fail
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
@ -324,19 +311,14 @@ public class TestINodeFile {
} }
{//cast from INodeFileUnderConstruction {//cast from INodeFileUnderConstruction
final INode from = new INodeFileUnderConstruction( final INode from = new INodeFile(
INodeId.GRANDFATHER_INODE_ID, perm, replication, 0L, 0L, "client", INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, replication, 1024L);
"machine", null); from.asFile().toUnderConstruction("client", "machine", null);
//cast to INodeFile, should success //cast to INodeFile, should success
final INodeFile f = INodeFile.valueOf(from, path); final INodeFile f = INodeFile.valueOf(from, path);
assertTrue(f == from); assertTrue(f == from);
//cast to INodeFileUnderConstruction, should success
final INodeFileUnderConstruction u = INodeFileUnderConstruction.valueOf(
from, path);
assertTrue(u == from);
//cast to INodeDirectory, should fail //cast to INodeDirectory, should fail
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
@ -358,14 +340,6 @@ public class TestINodeFile {
assertTrue(fnfe.getMessage().contains("Path is not a file")); assertTrue(fnfe.getMessage().contains("Path is not a file"));
} }
//cast to INodeFileUnderConstruction, should fail
try {
INodeFileUnderConstruction.valueOf(from, path);
fail();
} catch(FileNotFoundException fnfe) {
assertTrue(fnfe.getMessage().contains("Path is not a file"));
}
//cast to INodeDirectory, should success //cast to INodeDirectory, should success
final INodeDirectory d = INodeDirectory.valueOf(from, path); final INodeDirectory d = INodeDirectory.valueOf(from, path);
assertTrue(d == from); assertTrue(d == from);
@ -1015,4 +989,24 @@ public class TestINodeFile {
} }
} }
} }
@Test
public void testFileUnderConstruction() {
replication = 3;
final INodeFile file = new INodeFile(INodeId.GRANDFATHER_INODE_ID, null,
perm, 0L, 0L, null, replication, 1024L);
assertFalse(file.isUnderConstruction());
final String clientName = "client";
final String clientMachine = "machine";
file.toUnderConstruction(clientName, clientMachine, null);
assertTrue(file.isUnderConstruction());
FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
assertEquals(clientName, uc.getClientName());
assertEquals(clientMachine, uc.getClientMachine());
Assert.assertNull(uc.getClientNode());
file.toCompleteFile(Time.now());
assertFalse(file.isUnderConstruction());
}
} }

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -719,8 +718,8 @@ public class TestRetryCacheWithHA {
@Override @Override
boolean checkNamenodeBeforeReturn() throws Exception { boolean checkNamenodeBeforeReturn() throws Exception {
INodeFileUnderConstruction fileNode = (INodeFileUnderConstruction) cluster INodeFile fileNode = cluster.getNamesystem(0).getFSDirectory()
.getNamesystem(0).getFSDirectory().getINode4Write(file).asFile(); .getINode4Write(file).asFile();
BlockInfoUnderConstruction blkUC = BlockInfoUnderConstruction blkUC =
(BlockInfoUnderConstruction) (fileNode.getBlocks())[1]; (BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
int datanodeNum = blkUC.getExpectedLocations().length; int datanodeNum = blkUC.getExpectedLocations().length;

View File

@ -1227,8 +1227,9 @@ public class TestRenameWithSnapshots {
out.write(content); out.write(content);
fooRef = fsdir.getINode4Write(foo2.toString()); fooRef = fsdir.getINode4Write(foo2.toString());
assertTrue(fooRef instanceof INodeReference.DstReference); assertTrue(fooRef instanceof INodeReference.DstReference);
INode fooNode = fooRef.asFile(); INodeFile fooNode = fooRef.asFile();
assertTrue(fooNode instanceof INodeFileUnderConstructionWithSnapshot); assertTrue(fooNode instanceof INodeFileWithSnapshot);
assertTrue(fooNode.isUnderConstruction());
} finally { } finally {
if (out != null) { if (out != null) {
out.close(); out.close();
@ -1237,8 +1238,9 @@ public class TestRenameWithSnapshots {
fooRef = fsdir.getINode4Write(foo2.toString()); fooRef = fsdir.getINode4Write(foo2.toString());
assertTrue(fooRef instanceof INodeReference.DstReference); assertTrue(fooRef instanceof INodeReference.DstReference);
INode fooNode = fooRef.asFile(); INodeFile fooNode = fooRef.asFile();
assertTrue(fooNode instanceof INodeFileWithSnapshot); assertTrue(fooNode instanceof INodeFileWithSnapshot);
assertFalse(fooNode.isUnderConstruction());
restartClusterAndCheckImage(true); restartClusterAndCheckImage(true);
} }

View File

@ -314,7 +314,9 @@ public class TestSnapshotBlocksMap {
assertEquals(BLOCKSIZE, blks[0].getNumBytes()); assertEquals(BLOCKSIZE, blks[0].getNumBytes());
} }
/** Make sure we delete 0-sized block when deleting an INodeFileUC */ /**
* Make sure we delete 0-sized block when deleting an under-construction file
*/
@Test @Test
public void testDeletionWithZeroSizeBlock2() throws Exception { public void testDeletionWithZeroSizeBlock2() throws Exception {
final Path foo = new Path("/foo"); final Path foo = new Path("/foo");