HDFS-5285. Flatten INodeFile hierarchy: Add UnderContruction Feature. Merge change r1544389 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1568201 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f744409914
commit
e1fd74bb98
@ -39,6 +39,10 @@ Release 2.4.0 - UNRELEASED
|
|||||||
HDFS-5531. Combine the getNsQuota() and getDsQuota() methods in INode.
|
HDFS-5531. Combine the getNsQuota() and getDsQuota() methods in INode.
|
||||||
(szetszwo)
|
(szetszwo)
|
||||||
|
|
||||||
|
HDFS-5285. Flatten INodeFile hierarchy: Replace INodeFileUnderConstruction
|
||||||
|
and INodeFileUnderConstructionWithSnapshot with FileUnderContructionFeature.
|
||||||
|
(jing9 via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||||
|
@ -64,4 +64,21 @@ public interface BlockCollection {
|
|||||||
* Get the name of the collection.
|
* Get the name of the collection.
|
||||||
*/
|
*/
|
||||||
public String getName();
|
public String getName();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the block at the given index.
|
||||||
|
*/
|
||||||
|
public void setBlock(int index, BlockInfo blk);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the last block of the collection to an under-construction block
|
||||||
|
* and set the locations.
|
||||||
|
*/
|
||||||
|
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
|
||||||
|
DatanodeStorageInfo[] locations) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether the block collection is under construction.
|
||||||
|
*/
|
||||||
|
public boolean isUnderConstruction();
|
||||||
}
|
}
|
||||||
|
@ -563,7 +563,7 @@ private static boolean commitBlock(final BlockInfoUnderConstruction block,
|
|||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
public boolean commitOrCompleteLastBlock(MutableBlockCollection bc,
|
public boolean commitOrCompleteLastBlock(BlockCollection bc,
|
||||||
Block commitBlock) throws IOException {
|
Block commitBlock) throws IOException {
|
||||||
if(commitBlock == null)
|
if(commitBlock == null)
|
||||||
return false; // not committing, this is a block allocation retry
|
return false; // not committing, this is a block allocation retry
|
||||||
@ -586,7 +586,7 @@ public boolean commitOrCompleteLastBlock(MutableBlockCollection bc,
|
|||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
private BlockInfo completeBlock(final MutableBlockCollection bc,
|
private BlockInfo completeBlock(final BlockCollection bc,
|
||||||
final int blkIndex, boolean force) throws IOException {
|
final int blkIndex, boolean force) throws IOException {
|
||||||
if(blkIndex < 0)
|
if(blkIndex < 0)
|
||||||
return null;
|
return null;
|
||||||
@ -619,7 +619,7 @@ private BlockInfo completeBlock(final MutableBlockCollection bc,
|
|||||||
return blocksMap.replaceBlock(completeBlock);
|
return blocksMap.replaceBlock(completeBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockInfo completeBlock(final MutableBlockCollection bc,
|
private BlockInfo completeBlock(final BlockCollection bc,
|
||||||
final BlockInfo block, boolean force) throws IOException {
|
final BlockInfo block, boolean force) throws IOException {
|
||||||
BlockInfo[] fileBlocks = bc.getBlocks();
|
BlockInfo[] fileBlocks = bc.getBlocks();
|
||||||
for(int idx = 0; idx < fileBlocks.length; idx++)
|
for(int idx = 0; idx < fileBlocks.length; idx++)
|
||||||
@ -634,7 +634,7 @@ private BlockInfo completeBlock(final MutableBlockCollection bc,
|
|||||||
* regardless of whether enough replicas are present. This is necessary
|
* regardless of whether enough replicas are present. This is necessary
|
||||||
* when tailing edit logs as a Standby.
|
* when tailing edit logs as a Standby.
|
||||||
*/
|
*/
|
||||||
public BlockInfo forceCompleteBlock(final MutableBlockCollection bc,
|
public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
||||||
final BlockInfoUnderConstruction block) throws IOException {
|
final BlockInfoUnderConstruction block) throws IOException {
|
||||||
block.commitBlock(block);
|
block.commitBlock(block);
|
||||||
return completeBlock(bc, block, true);
|
return completeBlock(bc, block, true);
|
||||||
@ -655,7 +655,7 @@ public BlockInfo forceCompleteBlock(final MutableBlockCollection bc,
|
|||||||
* @return the last block locations if the block is partial or null otherwise
|
* @return the last block locations if the block is partial or null otherwise
|
||||||
*/
|
*/
|
||||||
public LocatedBlock convertLastBlockToUnderConstruction(
|
public LocatedBlock convertLastBlockToUnderConstruction(
|
||||||
MutableBlockCollection bc) throws IOException {
|
BlockCollection bc) throws IOException {
|
||||||
BlockInfo oldBlock = bc.getLastBlock();
|
BlockInfo oldBlock = bc.getLastBlock();
|
||||||
if(oldBlock == null ||
|
if(oldBlock == null ||
|
||||||
bc.getPreferredBlockSize() == oldBlock.getNumBytes())
|
bc.getPreferredBlockSize() == oldBlock.getNumBytes())
|
||||||
@ -1214,10 +1214,8 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|||||||
// block should belong to a file
|
// block should belong to a file
|
||||||
bc = blocksMap.getBlockCollection(block);
|
bc = blocksMap.getBlockCollection(block);
|
||||||
// abandoned block or block reopened for append
|
// abandoned block or block reopened for append
|
||||||
if(bc == null
|
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
||||||
|| (bc instanceof MutableBlockCollection && block.equals(bc.getLastBlock()))) {
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
// remove from neededReplications
|
|
||||||
neededReplications.remove(block, priority);
|
|
||||||
neededReplications.decrementReplicationIndex(priority);
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -1299,7 +1297,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|||||||
// block should belong to a file
|
// block should belong to a file
|
||||||
bc = blocksMap.getBlockCollection(block);
|
bc = blocksMap.getBlockCollection(block);
|
||||||
// abandoned block or block reopened for append
|
// abandoned block or block reopened for append
|
||||||
if(bc == null || (bc instanceof MutableBlockCollection && block.equals(bc.getLastBlock()))) {
|
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
rw.targets = null;
|
rw.targets = null;
|
||||||
neededReplications.decrementReplicationIndex(priority);
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
@ -2182,7 +2180,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock,
|
|||||||
int numCurrentReplica = countLiveNodes(storedBlock);
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
||||||
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
|
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
|
||||||
&& numCurrentReplica >= minReplication) {
|
&& numCurrentReplica >= minReplication) {
|
||||||
completeBlock((MutableBlockCollection)storedBlock.getBlockCollection(), storedBlock, false);
|
completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
|
||||||
} else if (storedBlock.isComplete()) {
|
} else if (storedBlock.isComplete()) {
|
||||||
// check whether safe replication is reached for the block
|
// check whether safe replication is reached for the block
|
||||||
// only complete blocks are counted towards that.
|
// only complete blocks are counted towards that.
|
||||||
@ -2253,7 +2251,7 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
|
|
||||||
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
||||||
numLiveReplicas >= minReplication) {
|
numLiveReplicas >= minReplication) {
|
||||||
storedBlock = completeBlock((MutableBlockCollection)bc, storedBlock, false);
|
storedBlock = completeBlock(bc, storedBlock, false);
|
||||||
} else if (storedBlock.isComplete()) {
|
} else if (storedBlock.isComplete()) {
|
||||||
// check whether safe replication is reached for the block
|
// check whether safe replication is reached for the block
|
||||||
// only complete blocks are counted towards that
|
// only complete blocks are counted towards that
|
||||||
@ -2264,7 +2262,7 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// if file is under construction, then done for now
|
// if file is under construction, then done for now
|
||||||
if (bc instanceof MutableBlockCollection) {
|
if (bc.isUnderConstruction()) {
|
||||||
return storedBlock;
|
return storedBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2877,7 +2875,7 @@ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
|
|||||||
+ ", corrupt replicas: " + num.corruptReplicas()
|
+ ", corrupt replicas: " + num.corruptReplicas()
|
||||||
+ ", decommissioned replicas: " + num.decommissionedReplicas()
|
+ ", decommissioned replicas: " + num.decommissionedReplicas()
|
||||||
+ ", excess replicas: " + num.excessReplicas()
|
+ ", excess replicas: " + num.excessReplicas()
|
||||||
+ ", Is Open File: " + (bc instanceof MutableBlockCollection)
|
+ ", Is Open File: " + bc.isUnderConstruction()
|
||||||
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
|
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
|
||||||
+ srcNode + ", Is current datanode decommissioning: "
|
+ srcNode + ", Is current datanode decommissioning: "
|
||||||
+ srcNode.isDecommissionInProgress());
|
+ srcNode.isDecommissionInProgress());
|
||||||
|
@ -276,13 +276,9 @@ void waitForReady() {
|
|||||||
* @throws UnresolvedLinkException
|
* @throws UnresolvedLinkException
|
||||||
* @throws SnapshotAccessControlException
|
* @throws SnapshotAccessControlException
|
||||||
*/
|
*/
|
||||||
INodeFileUnderConstruction addFile(String path,
|
INodeFile addFile(String path, PermissionStatus permissions,
|
||||||
PermissionStatus permissions,
|
short replication, long preferredBlockSize, String clientName,
|
||||||
short replication,
|
String clientMachine, DatanodeDescriptor clientNode)
|
||||||
long preferredBlockSize,
|
|
||||||
String clientName,
|
|
||||||
String clientMachine,
|
|
||||||
DatanodeDescriptor clientNode)
|
|
||||||
throws FileAlreadyExistsException, QuotaExceededException,
|
throws FileAlreadyExistsException, QuotaExceededException,
|
||||||
UnresolvedLinkException, SnapshotAccessControlException {
|
UnresolvedLinkException, SnapshotAccessControlException {
|
||||||
waitForReady();
|
waitForReady();
|
||||||
@ -300,11 +296,11 @@ INodeFileUnderConstruction addFile(String path,
|
|||||||
if (!mkdirs(parent.toString(), permissions, true, modTime)) {
|
if (!mkdirs(parent.toString(), permissions, true, modTime)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
|
INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
|
||||||
namesystem.allocateNewInodeId(),
|
permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
|
||||||
permissions,replication,
|
preferredBlockSize);
|
||||||
preferredBlockSize, modTime, clientName,
|
newNode.toUnderConstruction(clientName, clientMachine, clientNode);
|
||||||
clientMachine, clientNode);
|
|
||||||
boolean added = false;
|
boolean added = false;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -336,8 +332,11 @@ INodeFile unprotectedAddFile( long id,
|
|||||||
final INodeFile newNode;
|
final INodeFile newNode;
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
if (underConstruction) {
|
if (underConstruction) {
|
||||||
newNode = new INodeFileUnderConstruction(id, permissions, replication,
|
newNode = new INodeFile(id, null, permissions, modificationTime,
|
||||||
preferredBlockSize, modificationTime, clientName, clientMachine, null);
|
modificationTime, BlockInfo.EMPTY_ARRAY, replication,
|
||||||
|
preferredBlockSize);
|
||||||
|
newNode.toUnderConstruction(clientName, clientMachine, null);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
newNode = new INodeFile(id, null, permissions, modificationTime, atime,
|
newNode = new INodeFile(id, null, permissions, modificationTime, atime,
|
||||||
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize);
|
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize);
|
||||||
@ -366,8 +365,8 @@ BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
|
|||||||
|
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
final INodeFileUnderConstruction fileINode =
|
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
|
||||||
INodeFileUnderConstruction.valueOf(inodesInPath.getLastINode(), path);
|
Preconditions.checkState(fileINode.isUnderConstruction());
|
||||||
|
|
||||||
// check quota limits and updated space consumed
|
// check quota limits and updated space consumed
|
||||||
updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
|
updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
|
||||||
@ -397,8 +396,8 @@ BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
|
|||||||
/**
|
/**
|
||||||
* Persist the block list for the inode.
|
* Persist the block list for the inode.
|
||||||
*/
|
*/
|
||||||
void persistBlocks(String path, INodeFileUnderConstruction file,
|
void persistBlocks(String path, INodeFile file, boolean logRetryCache) {
|
||||||
boolean logRetryCache) {
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
waitForReady();
|
waitForReady();
|
||||||
|
|
||||||
writeLock();
|
writeLock();
|
||||||
@ -417,7 +416,8 @@ void persistBlocks(String path, INodeFileUnderConstruction file,
|
|||||||
/**
|
/**
|
||||||
* Persist the new block (the last block of the given file).
|
* Persist the new block (the last block of the given file).
|
||||||
*/
|
*/
|
||||||
void persistNewBlock(String path, INodeFileUnderConstruction file) {
|
void persistNewBlock(String path, INodeFile file) {
|
||||||
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
waitForReady();
|
waitForReady();
|
||||||
|
|
||||||
writeLock();
|
writeLock();
|
||||||
@ -456,8 +456,9 @@ void closeFile(String path, INodeFile file) {
|
|||||||
* Remove a block from the file.
|
* Remove a block from the file.
|
||||||
* @return Whether the block exists in the corresponding file
|
* @return Whether the block exists in the corresponding file
|
||||||
*/
|
*/
|
||||||
boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
|
boolean removeBlock(String path, INodeFile fileNode, Block block)
|
||||||
Block block) throws IOException {
|
throws IOException {
|
||||||
|
Preconditions.checkArgument(fileNode.isUnderConstruction());
|
||||||
waitForReady();
|
waitForReady();
|
||||||
|
|
||||||
writeLock();
|
writeLock();
|
||||||
@ -469,7 +470,8 @@ boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
|
|||||||
}
|
}
|
||||||
|
|
||||||
boolean unprotectedRemoveBlock(String path,
|
boolean unprotectedRemoveBlock(String path,
|
||||||
INodeFileUnderConstruction fileNode, Block block) throws IOException {
|
INodeFile fileNode, Block block) throws IOException {
|
||||||
|
Preconditions.checkArgument(fileNode.isUnderConstruction());
|
||||||
// modify file-> block and blocksMap
|
// modify file-> block and blocksMap
|
||||||
boolean removed = fileNode.removeLastBlock(block);
|
boolean removed = fileNode.removeLastBlock(block);
|
||||||
if (!removed) {
|
if (!removed) {
|
||||||
@ -1496,38 +1498,6 @@ private static void checkSnapshot(INode target,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Replaces the specified INodeFile with the specified one.
|
|
||||||
*/
|
|
||||||
void replaceINodeFile(String path, INodeFile oldnode,
|
|
||||||
INodeFile newnode) throws IOException {
|
|
||||||
writeLock();
|
|
||||||
try {
|
|
||||||
unprotectedReplaceINodeFile(path, oldnode, newnode);
|
|
||||||
} finally {
|
|
||||||
writeUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Replace an INodeFile and record modification for the latest snapshot. */
|
|
||||||
void unprotectedReplaceINodeFile(final String path, final INodeFile oldnode,
|
|
||||||
final INodeFile newnode) {
|
|
||||||
Preconditions.checkState(hasWriteLock());
|
|
||||||
|
|
||||||
oldnode.getParent().replaceChild(oldnode, newnode, inodeMap);
|
|
||||||
oldnode.clear();
|
|
||||||
|
|
||||||
/* Currently oldnode and newnode are assumed to contain the same
|
|
||||||
* blocks. Otherwise, blocks need to be removed from the blocksMap.
|
|
||||||
*/
|
|
||||||
int index = 0;
|
|
||||||
for (BlockInfo b : newnode.getBlocks()) {
|
|
||||||
BlockInfo info = getBlockManager().addBlockCollection(b, newnode);
|
|
||||||
newnode.setBlock(index, info); // inode refers to the block in BlocksMap
|
|
||||||
index++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a partial listing of the indicated directory
|
* Get a partial listing of the indicated directory
|
||||||
*
|
*
|
||||||
|
@ -679,8 +679,8 @@ private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
|
|||||||
* Add open lease record to edit log.
|
* Add open lease record to edit log.
|
||||||
* Records the block locations of the last block.
|
* Records the block locations of the last block.
|
||||||
*/
|
*/
|
||||||
public void logOpenFile(String path, INodeFileUnderConstruction newNode,
|
public void logOpenFile(String path, INodeFile newNode, boolean toLogRpcIds) {
|
||||||
boolean toLogRpcIds) {
|
Preconditions.checkArgument(newNode.isUnderConstruction());
|
||||||
AddOp op = AddOp.getInstance(cache.get())
|
AddOp op = AddOp.getInstance(cache.get())
|
||||||
.setInodeId(newNode.getId())
|
.setInodeId(newNode.getId())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
@ -690,8 +690,8 @@ public void logOpenFile(String path, INodeFileUnderConstruction newNode,
|
|||||||
.setBlockSize(newNode.getPreferredBlockSize())
|
.setBlockSize(newNode.getPreferredBlockSize())
|
||||||
.setBlocks(newNode.getBlocks())
|
.setBlocks(newNode.getBlocks())
|
||||||
.setPermissionStatus(newNode.getPermissionStatus())
|
.setPermissionStatus(newNode.getPermissionStatus())
|
||||||
.setClientName(newNode.getClientName())
|
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
|
||||||
.setClientMachine(newNode.getClientMachine());
|
.setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
|
||||||
logRpcIds(op, toLogRpcIds);
|
logRpcIds(op, toLogRpcIds);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
}
|
}
|
||||||
@ -712,7 +712,8 @@ public void logCloseFile(String path, INodeFile newNode) {
|
|||||||
logEdit(op);
|
logEdit(op);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void logAddBlock(String path, INodeFileUnderConstruction file) {
|
public void logAddBlock(String path, INodeFile file) {
|
||||||
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
BlockInfo[] blocks = file.getBlocks();
|
BlockInfo[] blocks = file.getBlocks();
|
||||||
Preconditions.checkState(blocks != null && blocks.length > 0);
|
Preconditions.checkState(blocks != null && blocks.length > 0);
|
||||||
BlockInfo pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null;
|
BlockInfo pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null;
|
||||||
@ -722,8 +723,8 @@ public void logAddBlock(String path, INodeFileUnderConstruction file) {
|
|||||||
logEdit(op);
|
logEdit(op);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void logUpdateBlocks(String path, INodeFileUnderConstruction file,
|
public void logUpdateBlocks(String path, INodeFile file, boolean toLogRpcIds) {
|
||||||
boolean toLogRpcIds) {
|
Preconditions.checkArgument(file.isUnderConstruction());
|
||||||
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
|
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.setBlocks(file.getBlocks());
|
.setBlocks(file.getBlocks());
|
||||||
|
@ -25,7 +25,6 @@
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -33,21 +32,20 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
|
||||||
@ -60,11 +58,11 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
|
||||||
@ -88,7 +86,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
||||||
import org.apache.hadoop.hdfs.util.ChunkedArrayList;
|
import org.apache.hadoop.hdfs.util.ChunkedArrayList;
|
||||||
import org.apache.hadoop.hdfs.util.Holder;
|
import org.apache.hadoop.hdfs.util.Holder;
|
||||||
import org.apache.jasper.tagplugins.jstl.core.Remove;
|
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
@ -379,15 +376,15 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|||||||
}
|
}
|
||||||
|
|
||||||
final INodesInPath iip = fsDir.getLastINodeInPath(path);
|
final INodesInPath iip = fsDir.getLastINodeInPath(path);
|
||||||
final INodeFile oldFile = INodeFile.valueOf(iip.getINode(0), path);
|
final INodeFile file = INodeFile.valueOf(iip.getINode(0), path);
|
||||||
|
|
||||||
// Update the salient file attributes.
|
// Update the salient file attributes.
|
||||||
oldFile.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
|
file.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
|
||||||
oldFile.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
|
file.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
|
||||||
updateBlocks(fsDir, addCloseOp, oldFile);
|
updateBlocks(fsDir, addCloseOp, file);
|
||||||
|
|
||||||
// Now close the file
|
// Now close the file
|
||||||
if (!oldFile.isUnderConstruction() &&
|
if (!file.isUnderConstruction() &&
|
||||||
logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
|
logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
|
||||||
// There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
|
// There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
|
||||||
// could show up twice in a row. But after that version, this
|
// could show up twice in a row. But after that version, this
|
||||||
@ -397,11 +394,9 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|||||||
}
|
}
|
||||||
// One might expect that you could use removeLease(holder, path) here,
|
// One might expect that you could use removeLease(holder, path) here,
|
||||||
// but OP_CLOSE doesn't serialize the holder. So, remove by path.
|
// but OP_CLOSE doesn't serialize the holder. So, remove by path.
|
||||||
if (oldFile.isUnderConstruction()) {
|
if (file.isUnderConstruction()) {
|
||||||
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
|
|
||||||
fsNamesys.leaseManager.removeLeaseWithPrefixPath(path);
|
fsNamesys.leaseManager.removeLeaseWithPrefixPath(path);
|
||||||
INodeFile newFile = ucFile.toINodeFile(ucFile.getModificationTime());
|
file.toCompleteFile(file.getModificationTime());
|
||||||
fsDir.unprotectedReplaceINodeFile(path, ucFile, newFile);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -608,9 +603,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|||||||
reassignLeaseOp.leaseHolder);
|
reassignLeaseOp.leaseHolder);
|
||||||
final String path =
|
final String path =
|
||||||
renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion);
|
renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion);
|
||||||
INodeFileUnderConstruction pendingFile =
|
INodeFile pendingFile = fsDir.getINode(path).asFile();
|
||||||
INodeFileUnderConstruction.valueOf(
|
Preconditions.checkState(pendingFile.isUnderConstruction());
|
||||||
fsDir.getINode(path), path);
|
|
||||||
fsNamesys.reassignLeaseInternal(lease,
|
fsNamesys.reassignLeaseInternal(lease,
|
||||||
path, reassignLeaseOp.newHolder, pendingFile);
|
path, reassignLeaseOp.newHolder, pendingFile);
|
||||||
break;
|
break;
|
||||||
@ -795,8 +789,7 @@ private void addNewBlock(FSDirectory fsDir, AddBlockOp op, INodeFile file)
|
|||||||
|
|
||||||
oldLastBlock.setNumBytes(pBlock.getNumBytes());
|
oldLastBlock.setNumBytes(pBlock.getNumBytes());
|
||||||
if (oldLastBlock instanceof BlockInfoUnderConstruction) {
|
if (oldLastBlock instanceof BlockInfoUnderConstruction) {
|
||||||
fsNamesys.getBlockManager().forceCompleteBlock(
|
fsNamesys.getBlockManager().forceCompleteBlock(file,
|
||||||
(INodeFileUnderConstruction) file,
|
|
||||||
(BlockInfoUnderConstruction) oldLastBlock);
|
(BlockInfoUnderConstruction) oldLastBlock);
|
||||||
fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock);
|
fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock);
|
||||||
}
|
}
|
||||||
@ -848,8 +841,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
|||||||
if (oldBlock instanceof BlockInfoUnderConstruction &&
|
if (oldBlock instanceof BlockInfoUnderConstruction &&
|
||||||
(!isLastBlock || op.shouldCompleteLastBlock())) {
|
(!isLastBlock || op.shouldCompleteLastBlock())) {
|
||||||
changeMade = true;
|
changeMade = true;
|
||||||
fsNamesys.getBlockManager().forceCompleteBlock(
|
fsNamesys.getBlockManager().forceCompleteBlock(file,
|
||||||
(INodeFileUnderConstruction)file,
|
|
||||||
(BlockInfoUnderConstruction) oldBlock);
|
(BlockInfoUnderConstruction) oldBlock);
|
||||||
}
|
}
|
||||||
if (changeMade) {
|
if (changeMade) {
|
||||||
@ -871,8 +863,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
|||||||
+ path);
|
+ path);
|
||||||
}
|
}
|
||||||
Block oldBlock = oldBlocks[oldBlocks.length - 1];
|
Block oldBlock = oldBlocks[oldBlocks.length - 1];
|
||||||
boolean removed = fsDir.unprotectedRemoveBlock(path,
|
boolean removed = fsDir.unprotectedRemoveBlock(path, file, oldBlock);
|
||||||
(INodeFileUnderConstruction) file, oldBlock);
|
|
||||||
if (!removed && !(op instanceof UpdateBlocksOp)) {
|
if (!removed && !(op instanceof UpdateBlocksOp)) {
|
||||||
throw new IOException("Trying to delete non-existant block " + oldBlock);
|
throw new IOException("Trying to delete non-existant block " + oldBlock);
|
||||||
}
|
}
|
||||||
|
@ -60,7 +60,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
|
||||||
@ -675,14 +674,11 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode,
|
|||||||
// file
|
// file
|
||||||
|
|
||||||
// read blocks
|
// read blocks
|
||||||
BlockInfo[] blocks = null;
|
BlockInfo[] blocks = new BlockInfo[numBlocks];
|
||||||
if (numBlocks >= 0) {
|
|
||||||
blocks = new BlockInfo[numBlocks];
|
|
||||||
for (int j = 0; j < numBlocks; j++) {
|
for (int j = 0; j < numBlocks; j++) {
|
||||||
blocks[j] = new BlockInfo(replication);
|
blocks[j] = new BlockInfo(replication);
|
||||||
blocks[j].readFields(in);
|
blocks[j].readFields(in);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
String clientName = "";
|
String clientName = "";
|
||||||
String clientMachine = "";
|
String clientMachine = "";
|
||||||
@ -716,10 +712,9 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode,
|
|||||||
final INodeFile file = new INodeFile(inodeId, localName, permissions,
|
final INodeFile file = new INodeFile(inodeId, localName, permissions,
|
||||||
modificationTime, atime, blocks, replication, blockSize);
|
modificationTime, atime, blocks, replication, blockSize);
|
||||||
if (underConstruction) {
|
if (underConstruction) {
|
||||||
INodeFileUnderConstruction fileUC = new INodeFileUnderConstruction(
|
file.toUnderConstruction(clientName, clientMachine, null);
|
||||||
file, clientName, clientMachine, null);
|
return fileDiffs == null ? file : new INodeFileWithSnapshot(file,
|
||||||
return fileDiffs == null ? fileUC :
|
fileDiffs);
|
||||||
new INodeFileUnderConstructionWithSnapshot(fileUC, fileDiffs);
|
|
||||||
} else {
|
} else {
|
||||||
return fileDiffs == null ? file :
|
return fileDiffs == null ? file :
|
||||||
new INodeFileWithSnapshot(file, fileDiffs);
|
new INodeFileWithSnapshot(file, fileDiffs);
|
||||||
@ -848,8 +843,8 @@ private void loadFilesUnderConstruction(DataInput in,
|
|||||||
LOG.info("Number of files under construction = " + size);
|
LOG.info("Number of files under construction = " + size);
|
||||||
|
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
INodeFileUnderConstruction cons = FSImageSerialization
|
INodeFile cons = FSImageSerialization.readINodeUnderConstruction(in,
|
||||||
.readINodeUnderConstruction(in, namesystem, getLayoutVersion());
|
namesystem, getLayoutVersion());
|
||||||
counter.increment();
|
counter.increment();
|
||||||
|
|
||||||
// verify that file exists in namespace
|
// verify that file exists in namespace
|
||||||
@ -868,32 +863,20 @@ private void loadFilesUnderConstruction(DataInput in,
|
|||||||
oldnode = INodeFile.valueOf(iip.getINode(0), path);
|
oldnode = INodeFile.valueOf(iip.getINode(0), path);
|
||||||
}
|
}
|
||||||
|
|
||||||
cons.setLocalName(oldnode.getLocalNameBytes());
|
FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
|
||||||
INodeReference parentRef = oldnode.getParentReference();
|
oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
|
||||||
if (parentRef != null) {
|
uc.getClientNode());
|
||||||
cons.setParentReference(parentRef);
|
if (oldnode.numBlocks() > 0) {
|
||||||
} else {
|
BlockInfo ucBlock = cons.getLastBlock();
|
||||||
cons.setParent(oldnode.getParent());
|
// we do not replace the inode, just replace the last block of oldnode
|
||||||
}
|
BlockInfo info = namesystem.getBlockManager().addBlockCollection(
|
||||||
|
ucBlock, oldnode);
|
||||||
if (oldnode instanceof INodeFileWithSnapshot) {
|
oldnode.setBlock(oldnode.numBlocks() - 1, info);
|
||||||
cons = new INodeFileUnderConstructionWithSnapshot(cons,
|
|
||||||
((INodeFileWithSnapshot) oldnode).getDiffs());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!inSnapshot) {
|
if (!inSnapshot) {
|
||||||
fsDir.replaceINodeFile(path, oldnode, cons);
|
namesystem.leaseManager.addLease(cons
|
||||||
namesystem.leaseManager.addLease(cons.getClientName(), path);
|
.getFileUnderConstructionFeature().getClientName(), path);
|
||||||
} else {
|
|
||||||
if (parentRef != null) {
|
|
||||||
// replace oldnode with cons
|
|
||||||
parentRef.setReferredINode(cons);
|
|
||||||
} else {
|
|
||||||
// replace old node in its parent's children list and deleted list
|
|
||||||
oldnode.getParent().replaceChildFileInSnapshot(oldnode, cons);
|
|
||||||
namesystem.dir.addToInodeMap(cons);
|
|
||||||
updateBlocksMap(cons);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1123,8 +1106,8 @@ static class Saver {
|
|||||||
private MD5Hash savedDigest;
|
private MD5Hash savedDigest;
|
||||||
private final ReferenceMap referenceMap = new ReferenceMap();
|
private final ReferenceMap referenceMap = new ReferenceMap();
|
||||||
|
|
||||||
private final Map<Long, INodeFileUnderConstruction> snapshotUCMap =
|
private final Map<Long, INodeFile> snapshotUCMap =
|
||||||
new HashMap<Long, INodeFileUnderConstruction>();
|
new HashMap<Long, INodeFile>();
|
||||||
|
|
||||||
/** @throws IllegalStateException if the instance has not yet saved an image */
|
/** @throws IllegalStateException if the instance has not yet saved an image */
|
||||||
private void checkSaved() {
|
private void checkSaved() {
|
||||||
@ -1265,8 +1248,7 @@ private int saveChildren(ReadOnlyList<INode> children,
|
|||||||
dirNum++;
|
dirNum++;
|
||||||
} else if (inSnapshot && child.isFile()
|
} else if (inSnapshot && child.isFile()
|
||||||
&& child.asFile().isUnderConstruction()) {
|
&& child.asFile().isUnderConstruction()) {
|
||||||
this.snapshotUCMap.put(child.getId(),
|
this.snapshotUCMap.put(child.getId(), child.asFile());
|
||||||
(INodeFileUnderConstruction) child.asFile());
|
|
||||||
}
|
}
|
||||||
if (i++ % 50 == 0) {
|
if (i++ % 50 == 0) {
|
||||||
context.checkCancelled();
|
context.checkCancelled();
|
||||||
|
@ -115,7 +115,7 @@ private static void writeBlocks(final Block[] blocks,
|
|||||||
// Helper function that reads in an INodeUnderConstruction
|
// Helper function that reads in an INodeUnderConstruction
|
||||||
// from the input stream
|
// from the input stream
|
||||||
//
|
//
|
||||||
static INodeFileUnderConstruction readINodeUnderConstruction(
|
static INodeFile readINodeUnderConstruction(
|
||||||
DataInput in, FSNamesystem fsNamesys, int imgVersion)
|
DataInput in, FSNamesystem fsNamesys, int imgVersion)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte[] name = readBytes(in);
|
byte[] name = readBytes(in);
|
||||||
@ -148,25 +148,17 @@ static INodeFileUnderConstruction readINodeUnderConstruction(
|
|||||||
int numLocs = in.readInt();
|
int numLocs = in.readInt();
|
||||||
assert numLocs == 0 : "Unexpected block locations";
|
assert numLocs == 0 : "Unexpected block locations";
|
||||||
|
|
||||||
return new INodeFileUnderConstruction(inodeId,
|
INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
|
||||||
name,
|
modificationTime, blocks, blockReplication, preferredBlockSize);
|
||||||
blockReplication,
|
file.toUnderConstruction(clientName, clientMachine, null);
|
||||||
modificationTime,
|
return file;
|
||||||
preferredBlockSize,
|
|
||||||
blocks,
|
|
||||||
perm,
|
|
||||||
clientName,
|
|
||||||
clientMachine,
|
|
||||||
null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function that writes an INodeUnderConstruction
|
// Helper function that writes an INodeUnderConstruction
|
||||||
// into the input stream
|
// into the input stream
|
||||||
//
|
//
|
||||||
static void writeINodeUnderConstruction(DataOutputStream out,
|
static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons,
|
||||||
INodeFileUnderConstruction cons,
|
String path) throws IOException {
|
||||||
String path)
|
|
||||||
throws IOException {
|
|
||||||
writeString(path, out);
|
writeString(path, out);
|
||||||
out.writeLong(cons.getId());
|
out.writeLong(cons.getId());
|
||||||
out.writeShort(cons.getFileReplication());
|
out.writeShort(cons.getFileReplication());
|
||||||
@ -176,8 +168,9 @@ static void writeINodeUnderConstruction(DataOutputStream out,
|
|||||||
writeBlocks(cons.getBlocks(), out);
|
writeBlocks(cons.getBlocks(), out);
|
||||||
cons.getPermissionStatus().write(out);
|
cons.getPermissionStatus().write(out);
|
||||||
|
|
||||||
writeString(cons.getClientName(), out);
|
FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
|
||||||
writeString(cons.getClientMachine(), out);
|
writeString(uc.getClientName(), out);
|
||||||
|
writeString(uc.getClientMachine(), out);
|
||||||
|
|
||||||
out.writeInt(0); // do not store locations of last block
|
out.writeInt(0); // do not store locations of last block
|
||||||
}
|
}
|
||||||
@ -201,9 +194,9 @@ public static void writeINodeFile(INodeFile file, DataOutput out,
|
|||||||
SnapshotFSImageFormat.saveFileDiffList(file, out);
|
SnapshotFSImageFormat.saveFileDiffList(file, out);
|
||||||
|
|
||||||
if (writeUnderConstruction) {
|
if (writeUnderConstruction) {
|
||||||
if (file instanceof INodeFileUnderConstruction) {
|
if (file.isUnderConstruction()) {
|
||||||
out.writeBoolean(true);
|
out.writeBoolean(true);
|
||||||
final INodeFileUnderConstruction uc = (INodeFileUnderConstruction)file;
|
final FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
|
||||||
writeString(uc.getClientName(), out);
|
writeString(uc.getClientName(), out);
|
||||||
writeString(uc.getClientMachine(), out);
|
writeString(uc.getClientMachine(), out);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2214,13 +2214,14 @@ private void startFileInternal(FSPermissionChecker pc, String src,
|
|||||||
final DatanodeDescriptor clientNode =
|
final DatanodeDescriptor clientNode =
|
||||||
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
|
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
|
||||||
|
|
||||||
INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
|
INodeFile newNode = dir.addFile(src, permissions, replication, blockSize,
|
||||||
replication, blockSize, holder, clientMachine, clientNode);
|
holder, clientMachine, clientNode);
|
||||||
if (newNode == null) {
|
if (newNode == null) {
|
||||||
throw new IOException("DIR* NameSystem.startFile: " +
|
throw new IOException("DIR* NameSystem.startFile: " +
|
||||||
"Unable to add file to namespace.");
|
"Unable to add file to namespace.");
|
||||||
}
|
}
|
||||||
leaseManager.addLease(newNode.getClientName(), src);
|
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
|
||||||
|
.getClientName(), src);
|
||||||
|
|
||||||
// record file record in log, record new generation stamp
|
// record file record in log, record new generation stamp
|
||||||
getEditLog().logOpenFile(src, newNode, logRetryEntry);
|
getEditLog().logOpenFile(src, newNode, logRetryEntry);
|
||||||
@ -2312,11 +2313,11 @@ LocatedBlock prepareFileForWrite(String src, INodeFile file,
|
|||||||
boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
|
boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
file = file.recordModification(latestSnapshot, dir.getINodeMap());
|
file = file.recordModification(latestSnapshot, dir.getINodeMap());
|
||||||
final INodeFileUnderConstruction cons = file.toUnderConstruction(
|
final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine,
|
||||||
leaseHolder, clientMachine, clientNode);
|
clientNode);
|
||||||
|
|
||||||
dir.replaceINodeFile(src, file, cons);
|
leaseManager.addLease(cons.getFileUnderConstructionFeature()
|
||||||
leaseManager.addLease(cons.getClientName(), src);
|
.getClientName(), src);
|
||||||
|
|
||||||
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
|
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
|
||||||
if (writeToEditLog) {
|
if (writeToEditLog) {
|
||||||
@ -2379,7 +2380,6 @@ private void recoverLeaseInternal(INodeFile fileInode,
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
if (fileInode != null && fileInode.isUnderConstruction()) {
|
if (fileInode != null && fileInode.isUnderConstruction()) {
|
||||||
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
|
|
||||||
//
|
//
|
||||||
// If the file is under construction , then it must be in our
|
// If the file is under construction , then it must be in our
|
||||||
// leases. Find the appropriate lease record.
|
// leases. Find the appropriate lease record.
|
||||||
@ -2402,7 +2402,9 @@ private void recoverLeaseInternal(INodeFile fileInode,
|
|||||||
//
|
//
|
||||||
// Find the original holder.
|
// Find the original holder.
|
||||||
//
|
//
|
||||||
lease = leaseManager.getLease(pendingFile.getClientName());
|
FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature();
|
||||||
|
String clientName = uc.getClientName();
|
||||||
|
lease = leaseManager.getLease(clientName);
|
||||||
if (lease == null) {
|
if (lease == null) {
|
||||||
throw new AlreadyBeingCreatedException(
|
throw new AlreadyBeingCreatedException(
|
||||||
"failed to create file " + src + " for " + holder +
|
"failed to create file " + src + " for " + holder +
|
||||||
@ -2413,26 +2415,26 @@ private void recoverLeaseInternal(INodeFile fileInode,
|
|||||||
// close now: no need to wait for soft lease expiration and
|
// close now: no need to wait for soft lease expiration and
|
||||||
// close only the file src
|
// close only the file src
|
||||||
LOG.info("recoverLease: " + lease + ", src=" + src +
|
LOG.info("recoverLease: " + lease + ", src=" + src +
|
||||||
" from client " + pendingFile.getClientName());
|
" from client " + clientName);
|
||||||
internalReleaseLease(lease, src, holder);
|
internalReleaseLease(lease, src, holder);
|
||||||
} else {
|
} else {
|
||||||
assert lease.getHolder().equals(pendingFile.getClientName()) :
|
assert lease.getHolder().equals(clientName) :
|
||||||
"Current lease holder " + lease.getHolder() +
|
"Current lease holder " + lease.getHolder() +
|
||||||
" does not match file creator " + pendingFile.getClientName();
|
" does not match file creator " + clientName;
|
||||||
//
|
//
|
||||||
// If the original holder has not renewed in the last SOFTLIMIT
|
// If the original holder has not renewed in the last SOFTLIMIT
|
||||||
// period, then start lease recovery.
|
// period, then start lease recovery.
|
||||||
//
|
//
|
||||||
if (lease.expiredSoftLimit()) {
|
if (lease.expiredSoftLimit()) {
|
||||||
LOG.info("startFile: recover " + lease + ", src=" + src + " client "
|
LOG.info("startFile: recover " + lease + ", src=" + src + " client "
|
||||||
+ pendingFile.getClientName());
|
+ clientName);
|
||||||
boolean isClosed = internalReleaseLease(lease, src, null);
|
boolean isClosed = internalReleaseLease(lease, src, null);
|
||||||
if(!isClosed)
|
if(!isClosed)
|
||||||
throw new RecoveryInProgressException(
|
throw new RecoveryInProgressException(
|
||||||
"Failed to close file " + src +
|
"Failed to close file " + src +
|
||||||
". Lease recovery is in progress. Try again later.");
|
". Lease recovery is in progress. Try again later.");
|
||||||
} else {
|
} else {
|
||||||
final BlockInfo lastBlock = pendingFile.getLastBlock();
|
final BlockInfo lastBlock = fileInode.getLastBlock();
|
||||||
if (lastBlock != null
|
if (lastBlock != null
|
||||||
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
|
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
|
||||||
throw new RecoveryInProgressException("Recovery in progress, file ["
|
throw new RecoveryInProgressException("Recovery in progress, file ["
|
||||||
@ -2441,8 +2443,8 @@ private void recoverLeaseInternal(INodeFile fileInode,
|
|||||||
throw new AlreadyBeingCreatedException("Failed to create file ["
|
throw new AlreadyBeingCreatedException("Failed to create file ["
|
||||||
+ src + "] for [" + holder + "] on client [" + clientMachine
|
+ src + "] for [" + holder + "] on client [" + clientMachine
|
||||||
+ "], because this file is already being created by ["
|
+ "], because this file is already being created by ["
|
||||||
+ pendingFile.getClientName() + "] on ["
|
+ clientName + "] on ["
|
||||||
+ pendingFile.getClientMachine() + "]");
|
+ uc.getClientMachine() + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2572,8 +2574,7 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
|||||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
final INode[] inodes = analyzeFileState(
|
final INode[] inodes = analyzeFileState(
|
||||||
src, fileId, clientName, previous, onRetryBlock).getINodes();
|
src, fileId, clientName, previous, onRetryBlock).getINodes();
|
||||||
final INodeFileUnderConstruction pendingFile =
|
final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
|
||||||
(INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
|
|
||||||
|
|
||||||
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
|
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
|
||||||
// This is a retry. Just return the last block if having locations.
|
// This is a retry. Just return the last block if having locations.
|
||||||
@ -2586,7 +2587,7 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
|||||||
+ maxBlocksPerFile);
|
+ maxBlocksPerFile);
|
||||||
}
|
}
|
||||||
blockSize = pendingFile.getPreferredBlockSize();
|
blockSize = pendingFile.getPreferredBlockSize();
|
||||||
clientNode = pendingFile.getClientNode();
|
clientNode = pendingFile.getFileUnderConstructionFeature().getClientNode();
|
||||||
replication = pendingFile.getFileReplication();
|
replication = pendingFile.getFileReplication();
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
@ -2610,8 +2611,7 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
|||||||
INodesInPath inodesInPath =
|
INodesInPath inodesInPath =
|
||||||
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
|
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
|
||||||
final INode[] inodes = inodesInPath.getINodes();
|
final INode[] inodes = inodesInPath.getINodes();
|
||||||
final INodeFileUnderConstruction pendingFile =
|
final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
|
||||||
(INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
|
|
||||||
|
|
||||||
if (onRetryBlock[0] != null) {
|
if (onRetryBlock[0] != null) {
|
||||||
if (onRetryBlock[0].getLocations().length > 0) {
|
if (onRetryBlock[0].getLocations().length > 0) {
|
||||||
@ -2664,7 +2664,7 @@ INodesInPath analyzeFileState(String src,
|
|||||||
|
|
||||||
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
||||||
final INodesInPath iip = dir.getINodesInPath4Write(src);
|
final INodesInPath iip = dir.getINodesInPath4Write(src);
|
||||||
final INodeFileUnderConstruction pendingFile
|
final INodeFile pendingFile
|
||||||
= checkLease(src, fileId, clientName, iip.getLastINode());
|
= checkLease(src, fileId, clientName, iip.getLastINode());
|
||||||
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
||||||
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
||||||
@ -2770,8 +2770,8 @@ LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
|
|||||||
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
||||||
|
|
||||||
//check lease
|
//check lease
|
||||||
final INodeFileUnderConstruction file = checkLease(src, clientName);
|
final INodeFile file = checkLease(src, clientName);
|
||||||
clientnode = file.getClientNode();
|
clientnode = file.getFileUnderConstructionFeature().getClientNode();
|
||||||
preferredblocksize = file.getPreferredBlockSize();
|
preferredblocksize = file.getPreferredBlockSize();
|
||||||
|
|
||||||
//find datanode storages
|
//find datanode storages
|
||||||
@ -2812,7 +2812,7 @@ boolean abandonBlock(ExtendedBlock b, String src, String holder)
|
|||||||
//
|
//
|
||||||
// Remove the block from the pending creates list
|
// Remove the block from the pending creates list
|
||||||
//
|
//
|
||||||
INodeFileUnderConstruction file = checkLease(src, holder);
|
INodeFile file = checkLease(src, holder);
|
||||||
boolean removed = dir.removeBlock(src, file,
|
boolean removed = dir.removeBlock(src, file,
|
||||||
ExtendedBlock.getLocalBlock(b));
|
ExtendedBlock.getLocalBlock(b));
|
||||||
if (!removed) {
|
if (!removed) {
|
||||||
@ -2832,16 +2832,15 @@ boolean abandonBlock(ExtendedBlock b, String src, String holder)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** make sure that we still have the lease on this file. */
|
/** make sure that we still have the lease on this file. */
|
||||||
private INodeFileUnderConstruction checkLease(String src, String holder)
|
private INodeFile checkLease(String src, String holder)
|
||||||
throws LeaseExpiredException, UnresolvedLinkException,
|
throws LeaseExpiredException, UnresolvedLinkException,
|
||||||
FileNotFoundException {
|
FileNotFoundException {
|
||||||
return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
|
return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
|
||||||
dir.getINode(src));
|
dir.getINode(src));
|
||||||
}
|
}
|
||||||
|
|
||||||
private INodeFileUnderConstruction checkLease(String src, long fileId,
|
private INodeFile checkLease(String src, long fileId, String holder,
|
||||||
String holder, INode inode) throws LeaseExpiredException,
|
INode inode) throws LeaseExpiredException, FileNotFoundException {
|
||||||
FileNotFoundException {
|
|
||||||
assert hasReadLock();
|
assert hasReadLock();
|
||||||
if (inode == null || !inode.isFile()) {
|
if (inode == null || !inode.isFile()) {
|
||||||
Lease lease = leaseManager.getLease(holder);
|
Lease lease = leaseManager.getLease(holder);
|
||||||
@ -2858,13 +2857,13 @@ private INodeFileUnderConstruction checkLease(String src, long fileId,
|
|||||||
+ (lease != null ? lease.toString()
|
+ (lease != null ? lease.toString()
|
||||||
: "Holder " + holder + " does not have any open files."));
|
: "Holder " + holder + " does not have any open files."));
|
||||||
}
|
}
|
||||||
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
|
String clientName = file.getFileUnderConstructionFeature().getClientName();
|
||||||
if (holder != null && !pendingFile.getClientName().equals(holder)) {
|
if (holder != null && !clientName.equals(holder)) {
|
||||||
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
|
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
|
||||||
+ pendingFile.getClientName() + " but is accessed by " + holder);
|
+ clientName + " but is accessed by " + holder);
|
||||||
}
|
}
|
||||||
INodeId.checkId(fileId, pendingFile);
|
INodeId.checkId(fileId, file);
|
||||||
return pendingFile;
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -2907,7 +2906,7 @@ private boolean completeFileInternal(String src,
|
|||||||
UnresolvedLinkException, IOException {
|
UnresolvedLinkException, IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
final INodesInPath iip = dir.getLastINodeInPath(src);
|
final INodesInPath iip = dir.getLastINodeInPath(src);
|
||||||
final INodeFileUnderConstruction pendingFile;
|
final INodeFile pendingFile;
|
||||||
try {
|
try {
|
||||||
pendingFile = checkLease(src, fileId, holder, iip.getINode(0));
|
pendingFile = checkLease(src, fileId, holder, iip.getINode(0));
|
||||||
} catch (LeaseExpiredException lee) {
|
} catch (LeaseExpiredException lee) {
|
||||||
@ -3603,9 +3602,10 @@ void fsync(String src, String clientName, long lastBlockLength)
|
|||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
checkNameNodeSafeMode("Cannot fsync file " + src);
|
checkNameNodeSafeMode("Cannot fsync file " + src);
|
||||||
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
||||||
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
INodeFile pendingFile = checkLease(src, clientName);
|
||||||
if (lastBlockLength > 0) {
|
if (lastBlockLength > 0) {
|
||||||
pendingFile.updateLengthOfLastBlock(lastBlockLength);
|
pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
|
||||||
|
pendingFile, lastBlockLength);
|
||||||
}
|
}
|
||||||
dir.persistBlocks(src, pendingFile, false);
|
dir.persistBlocks(src, pendingFile, false);
|
||||||
} finally {
|
} finally {
|
||||||
@ -3636,8 +3636,7 @@ boolean internalReleaseLease(Lease lease, String src,
|
|||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
|
||||||
final INodesInPath iip = dir.getLastINodeInPath(src);
|
final INodesInPath iip = dir.getLastINodeInPath(src);
|
||||||
final INodeFileUnderConstruction pendingFile
|
final INodeFile pendingFile = iip.getINode(0).asFile();
|
||||||
= INodeFileUnderConstruction.valueOf(iip.getINode(0), src);
|
|
||||||
int nrBlocks = pendingFile.numBlocks();
|
int nrBlocks = pendingFile.numBlocks();
|
||||||
BlockInfo[] blocks = pendingFile.getBlocks();
|
BlockInfo[] blocks = pendingFile.getBlocks();
|
||||||
|
|
||||||
@ -3759,7 +3758,7 @@ boolean internalReleaseLease(Lease lease, String src,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Lease reassignLease(Lease lease, String src, String newHolder,
|
private Lease reassignLease(Lease lease, String src, String newHolder,
|
||||||
INodeFileUnderConstruction pendingFile) {
|
INodeFile pendingFile) {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
if(newHolder == null)
|
if(newHolder == null)
|
||||||
return lease;
|
return lease;
|
||||||
@ -3769,15 +3768,16 @@ private Lease reassignLease(Lease lease, String src, String newHolder,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
|
Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
|
||||||
INodeFileUnderConstruction pendingFile) {
|
INodeFile pendingFile) {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
pendingFile.setClientName(newHolder);
|
pendingFile.getFileUnderConstructionFeature().setClientName(newHolder);
|
||||||
return leaseManager.reassignLease(lease, src, newHolder);
|
return leaseManager.reassignLease(lease, src, newHolder);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINode,
|
private void commitOrCompleteLastBlock(final INodeFile fileINode,
|
||||||
final Block commitBlock) throws IOException {
|
final Block commitBlock) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
||||||
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
|
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -3795,18 +3795,20 @@ private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINod
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void finalizeINodeFileUnderConstruction(String src,
|
private void finalizeINodeFileUnderConstruction(String src,
|
||||||
INodeFileUnderConstruction pendingFile, Snapshot latestSnapshot)
|
INodeFile pendingFile, Snapshot latestSnapshot) throws IOException,
|
||||||
throws IOException, UnresolvedLinkException {
|
UnresolvedLinkException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
leaseManager.removeLease(pendingFile.getClientName(), src);
|
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
|
||||||
|
Preconditions.checkArgument(uc != null);
|
||||||
|
leaseManager.removeLease(uc.getClientName(), src);
|
||||||
|
|
||||||
pendingFile = pendingFile.recordModification(latestSnapshot,
|
pendingFile = pendingFile.recordModification(latestSnapshot,
|
||||||
dir.getINodeMap());
|
dir.getINodeMap());
|
||||||
|
|
||||||
// The file is no longer pending.
|
// The file is no longer pending.
|
||||||
// Create permanent INode, update blocks
|
// Create permanent INode, update blocks. No need to replace the inode here
|
||||||
final INodeFile newFile = pendingFile.toINodeFile(now());
|
// since we just remove the uc feature from pendingFile
|
||||||
dir.replaceINodeFile(src, pendingFile, newFile);
|
final INodeFile newFile = pendingFile.toCompleteFile(now());
|
||||||
|
|
||||||
// close file and persist block allocations for this file
|
// close file and persist block allocations for this file
|
||||||
dir.closeFile(src, newFile);
|
dir.closeFile(src, newFile);
|
||||||
@ -3823,12 +3825,12 @@ BlockInfo getStoredBlock(Block block) {
|
|||||||
public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
|
public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
|
||||||
assert hasReadLock();
|
assert hasReadLock();
|
||||||
final BlockCollection bc = blockUC.getBlockCollection();
|
final BlockCollection bc = blockUC.getBlockCollection();
|
||||||
if (bc == null || !(bc instanceof INodeFileUnderConstruction)) {
|
if (bc == null || !(bc instanceof INodeFile)
|
||||||
|
|| !((INodeFile) bc).isUnderConstruction()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
INodeFileUnderConstruction inodeUC = (INodeFileUnderConstruction) blockUC
|
INodeFile inodeUC = (INodeFile) bc;
|
||||||
.getBlockCollection();
|
|
||||||
String fullName = inodeUC.getName();
|
String fullName = inodeUC.getName();
|
||||||
try {
|
try {
|
||||||
if (fullName != null && fullName.startsWith(Path.SEPARATOR)
|
if (fullName != null && fullName.startsWith(Path.SEPARATOR)
|
||||||
@ -3906,11 +3908,9 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
|
|||||||
+ recoveryId + " for block " + lastblock);
|
+ recoveryId + " for block " + lastblock);
|
||||||
}
|
}
|
||||||
|
|
||||||
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
|
|
||||||
|
|
||||||
if (deleteblock) {
|
if (deleteblock) {
|
||||||
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
|
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
|
||||||
boolean remove = pendingFile.removeLastBlock(blockToDel);
|
boolean remove = iFile.removeLastBlock(blockToDel);
|
||||||
if (remove) {
|
if (remove) {
|
||||||
blockManager.removeBlockFromMap(storedBlock);
|
blockManager.removeBlockFromMap(storedBlock);
|
||||||
}
|
}
|
||||||
@ -3955,14 +3955,14 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
|
|||||||
blockManager.getDatanodeManager().getDatanodeStorageInfos(
|
blockManager.getDatanodeManager().getDatanodeStorageInfos(
|
||||||
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
|
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
|
||||||
trimmedStorages.toArray(new String[trimmedStorages.size()]));
|
trimmedStorages.toArray(new String[trimmedStorages.size()]));
|
||||||
pendingFile.setLastBlock(storedBlock, trimmedStorageInfos);
|
iFile.setLastBlock(storedBlock, trimmedStorageInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (closeFile) {
|
if (closeFile) {
|
||||||
src = closeFileCommitBlocks(pendingFile, storedBlock);
|
src = closeFileCommitBlocks(iFile, storedBlock);
|
||||||
} else {
|
} else {
|
||||||
// If this commit does not want to close the file, persist blocks
|
// If this commit does not want to close the file, persist blocks
|
||||||
src = persistBlocks(pendingFile, false);
|
src = persistBlocks(iFile, false);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
@ -3987,10 +3987,8 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile,
|
String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
|
||||||
BlockInfo storedBlock)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
String src = pendingFile.getFullPathName();
|
String src = pendingFile.getFullPathName();
|
||||||
|
|
||||||
// commit the last block and complete it if it has minimum replicas
|
// commit the last block and complete it if it has minimum replicas
|
||||||
@ -4011,8 +4009,8 @@ String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile,
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
String persistBlocks(INodeFileUnderConstruction pendingFile,
|
String persistBlocks(INodeFile pendingFile, boolean logRetryCache)
|
||||||
boolean logRetryCache) throws IOException {
|
throws IOException {
|
||||||
String src = pendingFile.getFullPathName();
|
String src = pendingFile.getFullPathName();
|
||||||
dir.persistBlocks(src, pendingFile, logRetryCache);
|
dir.persistBlocks(src, pendingFile, logRetryCache);
|
||||||
return src;
|
return src;
|
||||||
@ -5197,13 +5195,12 @@ private long getCompleteBlocksTotal() {
|
|||||||
try {
|
try {
|
||||||
for (Lease lease : leaseManager.getSortedLeases()) {
|
for (Lease lease : leaseManager.getSortedLeases()) {
|
||||||
for (String path : lease.getPaths()) {
|
for (String path : lease.getPaths()) {
|
||||||
final INodeFileUnderConstruction cons;
|
final INodeFile cons;
|
||||||
try {
|
try {
|
||||||
cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
|
cons = dir.getINode(path).asFile();
|
||||||
|
Preconditions.checkState(cons.isUnderConstruction());
|
||||||
} catch (UnresolvedLinkException e) {
|
} catch (UnresolvedLinkException e) {
|
||||||
throw new AssertionError("Lease files should reside on this FS");
|
throw new AssertionError("Lease files should reside on this FS");
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
BlockInfo[] blocks = cons.getBlocks();
|
BlockInfo[] blocks = cons.getBlocks();
|
||||||
if(blocks == null)
|
if(blocks == null)
|
||||||
@ -5779,7 +5776,7 @@ private long nextBlockId() throws IOException {
|
|||||||
return blockId;
|
return blockId;
|
||||||
}
|
}
|
||||||
|
|
||||||
private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
|
private INodeFile checkUCBlock(ExtendedBlock block,
|
||||||
String clientName) throws IOException {
|
String clientName) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
checkNameNodeSafeMode("Cannot get a new generation stamp and an "
|
checkNameNodeSafeMode("Cannot get a new generation stamp and an "
|
||||||
@ -5801,13 +5798,14 @@ private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check lease
|
// check lease
|
||||||
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
|
if (clientName == null
|
||||||
if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
|
|| !clientName.equals(file.getFileUnderConstructionFeature()
|
||||||
|
.getClientName())) {
|
||||||
throw new LeaseExpiredException("Lease mismatch: " + block +
|
throw new LeaseExpiredException("Lease mismatch: " + block +
|
||||||
" is accessed by a non lease holder " + clientName);
|
" is accessed by a non lease holder " + clientName);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pendingFile;
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -5918,8 +5916,7 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
// check the vadility of the block and lease holder name
|
// check the vadility of the block and lease holder name
|
||||||
final INodeFileUnderConstruction pendingFile
|
final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
|
||||||
= checkUCBlock(oldBlock, clientName);
|
|
||||||
final BlockInfoUnderConstruction blockinfo
|
final BlockInfoUnderConstruction blockinfo
|
||||||
= (BlockInfoUnderConstruction)pendingFile.getLastBlock();
|
= (BlockInfoUnderConstruction)pendingFile.getLastBlock();
|
||||||
|
|
||||||
@ -5957,15 +5954,13 @@ void unprotectedChangeLease(String src, String dst) {
|
|||||||
* Serializes leases.
|
* Serializes leases.
|
||||||
*/
|
*/
|
||||||
void saveFilesUnderConstruction(DataOutputStream out,
|
void saveFilesUnderConstruction(DataOutputStream out,
|
||||||
Map<Long, INodeFileUnderConstruction> snapshotUCMap) throws IOException {
|
Map<Long, INodeFile> snapshotUCMap) throws IOException {
|
||||||
// This is run by an inferior thread of saveNamespace, which holds a read
|
// This is run by an inferior thread of saveNamespace, which holds a read
|
||||||
// lock on our behalf. If we took the read lock here, we could block
|
// lock on our behalf. If we took the read lock here, we could block
|
||||||
// for fairness if a writer is waiting on the lock.
|
// for fairness if a writer is waiting on the lock.
|
||||||
synchronized (leaseManager) {
|
synchronized (leaseManager) {
|
||||||
Map<String, INodeFileUnderConstruction> nodes =
|
Map<String, INodeFile> nodes = leaseManager.getINodesUnderConstruction();
|
||||||
leaseManager.getINodesUnderConstruction();
|
for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
|
||||||
for (Map.Entry<String, INodeFileUnderConstruction> entry
|
|
||||||
: nodes.entrySet()) {
|
|
||||||
// TODO: for HDFS-5428, because of rename operations, some
|
// TODO: for HDFS-5428, because of rename operations, some
|
||||||
// under-construction files that are
|
// under-construction files that are
|
||||||
// in the current fs directory can also be captured in the
|
// in the current fs directory can also be captured in the
|
||||||
@ -5974,13 +5969,11 @@ void saveFilesUnderConstruction(DataOutputStream out,
|
|||||||
}
|
}
|
||||||
|
|
||||||
out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size
|
out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size
|
||||||
for (Map.Entry<String, INodeFileUnderConstruction> entry
|
for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
|
||||||
: nodes.entrySet()) {
|
|
||||||
FSImageSerialization.writeINodeUnderConstruction(
|
FSImageSerialization.writeINodeUnderConstruction(
|
||||||
out, entry.getValue(), entry.getKey());
|
out, entry.getValue(), entry.getKey());
|
||||||
}
|
}
|
||||||
for (Map.Entry<Long, INodeFileUnderConstruction> entry
|
for (Map.Entry<Long, INodeFile> entry : snapshotUCMap.entrySet()) {
|
||||||
: snapshotUCMap.entrySet()) {
|
|
||||||
// for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
|
// for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
|
||||||
// as their paths
|
// as their paths
|
||||||
StringBuilder b = new StringBuilder();
|
StringBuilder b = new StringBuilder();
|
||||||
|
@ -0,0 +1,99 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* I-node for file being written.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class FileUnderConstructionFeature extends INodeFile.Feature {
|
||||||
|
private String clientName; // lease holder
|
||||||
|
private final String clientMachine;
|
||||||
|
// if client is a cluster node too.
|
||||||
|
private final DatanodeDescriptor clientNode;
|
||||||
|
|
||||||
|
public FileUnderConstructionFeature(final String clientName,
|
||||||
|
final String clientMachine,
|
||||||
|
final DatanodeDescriptor clientNode) {
|
||||||
|
this.clientName = clientName;
|
||||||
|
this.clientMachine = clientMachine;
|
||||||
|
this.clientNode = clientNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClientName() {
|
||||||
|
return clientName;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setClientName(String clientName) {
|
||||||
|
this.clientName = clientName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClientMachine() {
|
||||||
|
return clientMachine;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DatanodeDescriptor getClientNode() {
|
||||||
|
return clientNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the length for the last block
|
||||||
|
*
|
||||||
|
* @param lastBlockLength
|
||||||
|
* The length of the last block reported from client
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void updateLengthOfLastBlock(INodeFile f, long lastBlockLength)
|
||||||
|
throws IOException {
|
||||||
|
BlockInfo lastBlock = f.getLastBlock();
|
||||||
|
assert (lastBlock != null) : "The last block for path "
|
||||||
|
+ f.getFullPathName() + " is null when updating its length";
|
||||||
|
assert (lastBlock instanceof BlockInfoUnderConstruction)
|
||||||
|
: "The last block for path " + f.getFullPathName()
|
||||||
|
+ " is not a BlockInfoUnderConstruction when updating its length";
|
||||||
|
lastBlock.setNumBytes(lastBlockLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When deleting a file in the current fs directory, and the file is contained
|
||||||
|
* in a snapshot, we should delete the last block if it's under construction
|
||||||
|
* and its size is 0.
|
||||||
|
*/
|
||||||
|
void cleanZeroSizeBlock(final INodeFile f,
|
||||||
|
final BlocksMapUpdateInfo collectedBlocks) {
|
||||||
|
final BlockInfo[] blocks = f.getBlocks();
|
||||||
|
if (blocks != null && blocks.length > 0
|
||||||
|
&& blocks[blocks.length - 1] instanceof BlockInfoUnderConstruction) {
|
||||||
|
BlockInfoUnderConstruction lastUC =
|
||||||
|
(BlockInfoUnderConstruction) blocks[blocks.length - 1];
|
||||||
|
if (lastUC.getNumBytes() == 0) {
|
||||||
|
// this is a 0-sized block. do not need check its UC state here
|
||||||
|
collectedBlocks.addDeleteBlock(lastUC);
|
||||||
|
f.removeLastBlock(lastUC);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -34,7 +34,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
|
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
||||||
@ -205,23 +204,6 @@ private final <N extends INodeDirectory> N replaceSelf(final N newDir,
|
|||||||
return newDir;
|
return newDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Used when load fileUC from fsimage. The file to be replaced is actually
|
|
||||||
* only in snapshot, thus may not be contained in the children list.
|
|
||||||
* See HDFS-5428 for details.
|
|
||||||
*/
|
|
||||||
public void replaceChildFileInSnapshot(INodeFile oldChild,
|
|
||||||
final INodeFile newChild) {
|
|
||||||
if (children != null) {
|
|
||||||
final int i = searchChildren(newChild.getLocalNameBytes());
|
|
||||||
if (i >= 0 && children.get(i).getId() == oldChild.getId()) {
|
|
||||||
// no need to consider reference node here, since we already do the
|
|
||||||
// replacement in FSImageFormat.Loader#loadFilesUnderConstruction
|
|
||||||
children.set(i, newChild);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Replace the given child with a new child. */
|
/** Replace the given child with a new child. */
|
||||||
public void replaceChild(INode oldChild, final INode newChild,
|
public void replaceChild(INode oldChild, final INode newChild,
|
||||||
final INodeMap inodeMap) {
|
final INodeMap inodeMap) {
|
||||||
@ -291,17 +273,6 @@ INodeFileWithSnapshot replaceChild4INodeFileWithSnapshot(
|
|||||||
return newChild;
|
return newChild;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Replace a child {@link INodeFile} with an {@link INodeFileUnderConstructionWithSnapshot}. */
|
|
||||||
INodeFileUnderConstructionWithSnapshot replaceChild4INodeFileUcWithSnapshot(
|
|
||||||
final INodeFileUnderConstruction child, final INodeMap inodeMap) {
|
|
||||||
Preconditions.checkArgument(!(child instanceof INodeFileUnderConstructionWithSnapshot),
|
|
||||||
"Child file is already an INodeFileUnderConstructionWithSnapshot, child=" + child);
|
|
||||||
final INodeFileUnderConstructionWithSnapshot newChild
|
|
||||||
= new INodeFileUnderConstructionWithSnapshot(child, null);
|
|
||||||
replaceChildFile(child, newChild, inodeMap);
|
|
||||||
return newChild;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public INodeDirectory recordModification(Snapshot latest,
|
public INodeDirectory recordModification(Snapshot latest,
|
||||||
final INodeMap inodeMap) throws QuotaExceededException {
|
final INodeMap inodeMap) throws QuotaExceededException {
|
||||||
|
@ -20,17 +20,19 @@
|
|||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
|
||||||
@ -45,6 +47,22 @@
|
|||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class INodeFile extends INodeWithAdditionalFields
|
public class INodeFile extends INodeWithAdditionalFields
|
||||||
implements INodeFileAttributes, BlockCollection {
|
implements INodeFileAttributes, BlockCollection {
|
||||||
|
/**
|
||||||
|
* A feature contains specific information for a type of INodeFile. E.g.,
|
||||||
|
* we can have separate features for Under-Construction and Snapshot.
|
||||||
|
*/
|
||||||
|
public static abstract class Feature {
|
||||||
|
private Feature nextFeature;
|
||||||
|
|
||||||
|
public Feature getNextFeature() {
|
||||||
|
return nextFeature;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNextFeature(Feature next) {
|
||||||
|
this.nextFeature = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** The same as valueOf(inode, path, false). */
|
/** The same as valueOf(inode, path, false). */
|
||||||
public static INodeFile valueOf(INode inode, String path
|
public static INodeFile valueOf(INode inode, String path
|
||||||
) throws FileNotFoundException {
|
) throws FileNotFoundException {
|
||||||
@ -106,8 +124,11 @@ static long combinePreferredBlockSize(long header, long blockSize) {
|
|||||||
|
|
||||||
private BlockInfo[] blocks;
|
private BlockInfo[] blocks;
|
||||||
|
|
||||||
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
|
private Feature headFeature;
|
||||||
BlockInfo[] blklist, short replication, long preferredBlockSize) {
|
|
||||||
|
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
|
||||||
|
long atime, BlockInfo[] blklist, short replication,
|
||||||
|
long preferredBlockSize) {
|
||||||
super(id, name, permissions, mtime, atime);
|
super(id, name, permissions, mtime, atime);
|
||||||
header = HeaderFormat.combineReplication(header, replication);
|
header = HeaderFormat.combineReplication(header, replication);
|
||||||
header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
|
header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
|
||||||
@ -118,6 +139,48 @@ public INodeFile(INodeFile that) {
|
|||||||
super(that);
|
super(that);
|
||||||
this.header = that.header;
|
this.header = that.header;
|
||||||
this.blocks = that.blocks;
|
this.blocks = that.blocks;
|
||||||
|
this.headFeature = that.headFeature;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the inode contains a {@link FileUnderConstructionFeature}, return it;
|
||||||
|
* otherwise, return null.
|
||||||
|
*/
|
||||||
|
public final FileUnderConstructionFeature getFileUnderConstructionFeature() {
|
||||||
|
for (Feature f = this.headFeature; f != null; f = f.nextFeature) {
|
||||||
|
if (f instanceof FileUnderConstructionFeature) {
|
||||||
|
return (FileUnderConstructionFeature) f;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Is this file under construction? */
|
||||||
|
@Override // BlockCollection
|
||||||
|
public boolean isUnderConstruction() {
|
||||||
|
return getFileUnderConstructionFeature() != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void addFeature(Feature f) {
|
||||||
|
f.nextFeature = headFeature;
|
||||||
|
headFeature = f;
|
||||||
|
}
|
||||||
|
|
||||||
|
void removeFeature(Feature f) {
|
||||||
|
if (f == headFeature) {
|
||||||
|
headFeature = headFeature.nextFeature;
|
||||||
|
return;
|
||||||
|
} else if (headFeature != null) {
|
||||||
|
Feature prev = headFeature;
|
||||||
|
Feature curr = headFeature.nextFeature;
|
||||||
|
for (; curr != null && curr != f; prev = curr, curr = curr.nextFeature)
|
||||||
|
;
|
||||||
|
if (curr != null) {
|
||||||
|
prev.nextFeature = curr.nextFeature;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("Feature " + f + " not found.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return true unconditionally. */
|
/** @return true unconditionally. */
|
||||||
@ -132,22 +195,88 @@ public final INodeFile asFile() {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Is this file under construction? */
|
/* Start of Under-Construction Feature */
|
||||||
public boolean isUnderConstruction() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Convert this file to an {@link INodeFileUnderConstruction}. */
|
/** Convert this file to an {@link INodeFileUnderConstruction}. */
|
||||||
public INodeFileUnderConstruction toUnderConstruction(
|
public INodeFile toUnderConstruction(String clientName, String clientMachine,
|
||||||
String clientName,
|
|
||||||
String clientMachine,
|
|
||||||
DatanodeDescriptor clientNode) {
|
DatanodeDescriptor clientNode) {
|
||||||
Preconditions.checkState(!isUnderConstruction(),
|
Preconditions.checkState(!isUnderConstruction(),
|
||||||
"file is already an INodeFileUnderConstruction");
|
"file is already an INodeFileUnderConstruction");
|
||||||
return new INodeFileUnderConstruction(this,
|
FileUnderConstructionFeature uc = new FileUnderConstructionFeature(
|
||||||
clientName, clientMachine, clientNode);
|
clientName, clientMachine, clientNode);
|
||||||
|
addFeature(uc);
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the file to a complete file, i.e., to remove the Under-Construction
|
||||||
|
* feature.
|
||||||
|
*/
|
||||||
|
public INodeFile toCompleteFile(long mtime) {
|
||||||
|
FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
|
||||||
|
if (uc != null) {
|
||||||
|
assertAllBlocksComplete();
|
||||||
|
removeFeature(uc);
|
||||||
|
this.setModificationTime(mtime);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Assert all blocks are complete. */
|
||||||
|
private void assertAllBlocksComplete() {
|
||||||
|
if (blocks == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
|
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
|
||||||
|
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
|
||||||
|
getClass().getSimpleName(), this, i, Arrays.asList(blocks));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override //BlockCollection
|
||||||
|
public void setBlock(int index, BlockInfo blk) {
|
||||||
|
this.blocks[index] = blk;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override // BlockCollection
|
||||||
|
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
|
||||||
|
DatanodeStorageInfo[] locations) throws IOException {
|
||||||
|
Preconditions.checkState(isUnderConstruction());
|
||||||
|
|
||||||
|
if (numBlocks() == 0) {
|
||||||
|
throw new IOException("Failed to set last block: File is empty.");
|
||||||
|
}
|
||||||
|
BlockInfoUnderConstruction ucBlock =
|
||||||
|
lastBlock.convertToBlockUnderConstruction(
|
||||||
|
BlockUCState.UNDER_CONSTRUCTION, locations);
|
||||||
|
ucBlock.setBlockCollection(this);
|
||||||
|
setBlock(numBlocks() - 1, ucBlock);
|
||||||
|
return ucBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a block from the block list. This block should be
|
||||||
|
* the last one on the list.
|
||||||
|
*/
|
||||||
|
boolean removeLastBlock(Block oldblock) {
|
||||||
|
if (blocks == null || blocks.length == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
int size_1 = blocks.length - 1;
|
||||||
|
if (!blocks[size_1].equals(oldblock)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
//copy to a new list
|
||||||
|
BlockInfo[] newlist = new BlockInfo[size_1];
|
||||||
|
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
||||||
|
setBlocks(newlist);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* End of Under-Construction Feature */
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public INodeFileAttributes getSnapshotINode(final Snapshot snapshot) {
|
public INodeFileAttributes getSnapshotINode(final Snapshot snapshot) {
|
||||||
return this;
|
return this;
|
||||||
@ -268,11 +397,6 @@ void addBlock(BlockInfo newblock) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set the block of the file at the given index. */
|
|
||||||
public void setBlock(int idx, BlockInfo blk) {
|
|
||||||
this.blocks[idx] = blk;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Set the blocks. */
|
/** Set the blocks. */
|
||||||
public void setBlocks(BlockInfo[] blocks) {
|
public void setBlocks(BlockInfo[] blocks) {
|
||||||
this.blocks = blocks;
|
this.blocks = blocks;
|
||||||
@ -288,6 +412,11 @@ public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
|
|||||||
// this only happens when deleting the current file
|
// this only happens when deleting the current file
|
||||||
computeQuotaUsage(counts, false);
|
computeQuotaUsage(counts, false);
|
||||||
destroyAndCollectBlocks(collectedBlocks, removedINodes);
|
destroyAndCollectBlocks(collectedBlocks, removedINodes);
|
||||||
|
} else if (snapshot == null && prior != null) {
|
||||||
|
FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
|
||||||
|
if (uc != null) {
|
||||||
|
uc.cleanZeroSizeBlock(this, collectedBlocks);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return counts;
|
return counts;
|
||||||
}
|
}
|
||||||
|
@ -1,244 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.*;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* I-node for file being written.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
|
|
||||||
/** Cast INode to INodeFileUnderConstruction. */
|
|
||||||
public static INodeFileUnderConstruction valueOf(INode inode, String path
|
|
||||||
) throws FileNotFoundException {
|
|
||||||
final INodeFile file = INodeFile.valueOf(inode, path);
|
|
||||||
if (!file.isUnderConstruction()) {
|
|
||||||
throw new FileNotFoundException("File is not under construction: " + path);
|
|
||||||
}
|
|
||||||
return (INodeFileUnderConstruction)file;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String clientName; // lease holder
|
|
||||||
private final String clientMachine;
|
|
||||||
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
|
|
||||||
|
|
||||||
INodeFileUnderConstruction(long id,
|
|
||||||
PermissionStatus permissions,
|
|
||||||
short replication,
|
|
||||||
long preferredBlockSize,
|
|
||||||
long modTime,
|
|
||||||
String clientName,
|
|
||||||
String clientMachine,
|
|
||||||
DatanodeDescriptor clientNode) {
|
|
||||||
this(id, null, replication, modTime, preferredBlockSize, BlockInfo.EMPTY_ARRAY,
|
|
||||||
permissions, clientName, clientMachine, clientNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
INodeFileUnderConstruction(long id,
|
|
||||||
byte[] name,
|
|
||||||
short blockReplication,
|
|
||||||
long modificationTime,
|
|
||||||
long preferredBlockSize,
|
|
||||||
BlockInfo[] blocks,
|
|
||||||
PermissionStatus perm,
|
|
||||||
String clientName,
|
|
||||||
String clientMachine,
|
|
||||||
DatanodeDescriptor clientNode) {
|
|
||||||
super(id, name, perm, modificationTime, modificationTime,
|
|
||||||
blocks, blockReplication, preferredBlockSize);
|
|
||||||
this.clientName = clientName;
|
|
||||||
this.clientMachine = clientMachine;
|
|
||||||
this.clientNode = clientNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
public INodeFileUnderConstruction(final INodeFile that,
|
|
||||||
final String clientName,
|
|
||||||
final String clientMachine,
|
|
||||||
final DatanodeDescriptor clientNode) {
|
|
||||||
super(that);
|
|
||||||
this.clientName = clientName;
|
|
||||||
this.clientMachine = clientMachine;
|
|
||||||
this.clientNode = clientNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getClientName() {
|
|
||||||
return clientName;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setClientName(String clientName) {
|
|
||||||
this.clientName = clientName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getClientMachine() {
|
|
||||||
return clientMachine;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DatanodeDescriptor getClientNode() {
|
|
||||||
return clientNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @return true unconditionally. */
|
|
||||||
@Override
|
|
||||||
public final boolean isUnderConstruction() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts an INodeFileUnderConstruction to an INodeFile.
|
|
||||||
* The original modification time is used as the access time.
|
|
||||||
* The new modification is the specified mtime.
|
|
||||||
*/
|
|
||||||
protected INodeFile toINodeFile(long mtime) {
|
|
||||||
assertAllBlocksComplete();
|
|
||||||
|
|
||||||
final INodeFile f = new INodeFile(getId(), getLocalNameBytes(),
|
|
||||||
getPermissionStatus(), mtime, getModificationTime(),
|
|
||||||
getBlocks(), getFileReplication(), getPreferredBlockSize());
|
|
||||||
f.setParent(getParent());
|
|
||||||
return f;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
|
|
||||||
final BlocksMapUpdateInfo collectedBlocks,
|
|
||||||
final List<INode> removedINodes, final boolean countDiffChange)
|
|
||||||
throws QuotaExceededException {
|
|
||||||
if (snapshot == null && prior != null) {
|
|
||||||
cleanZeroSizeBlock(collectedBlocks);
|
|
||||||
return Counts.newInstance();
|
|
||||||
} else {
|
|
||||||
return super.cleanSubtree(snapshot, prior, collectedBlocks,
|
|
||||||
removedINodes, countDiffChange);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* When deleting a file in the current fs directory, and the file is contained
|
|
||||||
* in a snapshot, we should delete the last block if it's under construction
|
|
||||||
* and its size is 0.
|
|
||||||
*/
|
|
||||||
private void cleanZeroSizeBlock(final BlocksMapUpdateInfo collectedBlocks) {
|
|
||||||
final BlockInfo[] blocks = getBlocks();
|
|
||||||
if (blocks != null && blocks.length > 0
|
|
||||||
&& blocks[blocks.length - 1] instanceof BlockInfoUnderConstruction) {
|
|
||||||
BlockInfoUnderConstruction lastUC =
|
|
||||||
(BlockInfoUnderConstruction) blocks[blocks.length - 1];
|
|
||||||
if (lastUC.getNumBytes() == 0) {
|
|
||||||
// this is a 0-sized block. do not need check its UC state here
|
|
||||||
collectedBlocks.addDeleteBlock(lastUC);
|
|
||||||
removeLastBlock(lastUC);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public INodeFileUnderConstruction recordModification(final Snapshot latest,
|
|
||||||
final INodeMap inodeMap) throws QuotaExceededException {
|
|
||||||
if (isInLatestSnapshot(latest)) {
|
|
||||||
INodeFileUnderConstructionWithSnapshot newFile = getParent()
|
|
||||||
.replaceChild4INodeFileUcWithSnapshot(this, inodeMap)
|
|
||||||
.recordModification(latest, inodeMap);
|
|
||||||
return newFile;
|
|
||||||
} else {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Assert all blocks are complete. */
|
|
||||||
protected void assertAllBlocksComplete() {
|
|
||||||
final BlockInfo[] blocks = getBlocks();
|
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
|
||||||
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
|
|
||||||
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
|
|
||||||
getClass().getSimpleName(), this, i, Arrays.asList(getBlocks()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove a block from the block list. This block should be
|
|
||||||
* the last one on the list.
|
|
||||||
*/
|
|
||||||
boolean removeLastBlock(Block oldblock) {
|
|
||||||
final BlockInfo[] blocks = getBlocks();
|
|
||||||
if (blocks == null || blocks.length == 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
int size_1 = blocks.length - 1;
|
|
||||||
if (!blocks[size_1].equals(oldblock)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
//copy to a new list
|
|
||||||
BlockInfo[] newlist = new BlockInfo[size_1];
|
|
||||||
System.arraycopy(blocks, 0, newlist, 0, size_1);
|
|
||||||
setBlocks(newlist);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert the last block of the file to an under-construction block.
|
|
||||||
* Set its locations.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
|
|
||||||
DatanodeStorageInfo[] targets) throws IOException {
|
|
||||||
if (numBlocks() == 0) {
|
|
||||||
throw new IOException("Failed to set last block: File is empty.");
|
|
||||||
}
|
|
||||||
BlockInfoUnderConstruction ucBlock =
|
|
||||||
lastBlock.convertToBlockUnderConstruction(
|
|
||||||
BlockUCState.UNDER_CONSTRUCTION, targets);
|
|
||||||
ucBlock.setBlockCollection(this);
|
|
||||||
setBlock(numBlocks()-1, ucBlock);
|
|
||||||
return ucBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update the length for the last block
|
|
||||||
*
|
|
||||||
* @param lastBlockLength
|
|
||||||
* The length of the last block reported from client
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void updateLengthOfLastBlock(long lastBlockLength) throws IOException {
|
|
||||||
BlockInfo lastBlock = this.getLastBlock();
|
|
||||||
assert (lastBlock != null) : "The last block for path "
|
|
||||||
+ this.getFullPathName() + " is null when updating its length";
|
|
||||||
assert (lastBlock instanceof BlockInfoUnderConstruction) : "The last block for path "
|
|
||||||
+ this.getFullPathName()
|
|
||||||
+ " is not a BlockInfoUnderConstruction when updating its length";
|
|
||||||
lastBlock.setNumBytes(lastBlockLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -399,14 +399,14 @@ public void run() {
|
|||||||
* @return list of inodes
|
* @return list of inodes
|
||||||
* @throws UnresolvedLinkException
|
* @throws UnresolvedLinkException
|
||||||
*/
|
*/
|
||||||
Map<String, INodeFileUnderConstruction> getINodesUnderConstruction() {
|
Map<String, INodeFile> getINodesUnderConstruction() {
|
||||||
Map<String, INodeFileUnderConstruction> inodes =
|
Map<String, INodeFile> inodes = new TreeMap<String, INodeFile>();
|
||||||
new TreeMap<String, INodeFileUnderConstruction>();
|
|
||||||
for (String p : sortedLeasesByPath.keySet()) {
|
for (String p : sortedLeasesByPath.keySet()) {
|
||||||
// verify that path exists in namespace
|
// verify that path exists in namespace
|
||||||
try {
|
try {
|
||||||
INode node = fsnamesystem.dir.getINode(p);
|
INodeFile node = INodeFile.valueOf(fsnamesystem.dir.getINode(p), p);
|
||||||
inodes.put(p, INodeFileUnderConstruction.valueOf(node, p));
|
Preconditions.checkState(node.isUnderConstruction());
|
||||||
|
inodes.put(p, node);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error(ioe);
|
LOG.error(ioe);
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
|
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
|
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
|
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeMap;
|
import org.apache.hadoop.hdfs.server.namenode.INodeMap;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeReference;
|
import org.apache.hadoop.hdfs.server.namenode.INodeReference;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Quota;
|
import org.apache.hadoop.hdfs.server.namenode.Quota;
|
||||||
@ -589,14 +588,6 @@ public boolean removeChild(INode child, Snapshot latest,
|
|||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void replaceChildFileInSnapshot(final INodeFile oldChild,
|
|
||||||
final INodeFile newChild) {
|
|
||||||
super.replaceChildFileInSnapshot(oldChild, newChild);
|
|
||||||
diffs.replaceChild(ListType.DELETED, oldChild, newChild);
|
|
||||||
diffs.replaceChild(ListType.CREATED, oldChild, newChild);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void replaceChild(final INode oldChild, final INode newChild,
|
public void replaceChild(final INode oldChild, final INode newChild,
|
||||||
final INodeMap inodeMap) {
|
final INodeMap inodeMap) {
|
||||||
|
@ -1,130 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeMap;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.Quota;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Represent an {@link INodeFileUnderConstruction} that is snapshotted.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class INodeFileUnderConstructionWithSnapshot
|
|
||||||
extends INodeFileUnderConstruction implements FileWithSnapshot {
|
|
||||||
private final FileDiffList diffs;
|
|
||||||
private boolean isCurrentFileDeleted = false;
|
|
||||||
|
|
||||||
INodeFileUnderConstructionWithSnapshot(final INodeFile f,
|
|
||||||
final String clientName,
|
|
||||||
final String clientMachine,
|
|
||||||
final DatanodeDescriptor clientNode,
|
|
||||||
final FileDiffList diffs) {
|
|
||||||
super(f, clientName, clientMachine, clientNode);
|
|
||||||
this.diffs = diffs != null? diffs: new FileDiffList();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Construct an {@link INodeFileUnderConstructionWithSnapshot} based on an
|
|
||||||
* {@link INodeFileUnderConstruction}.
|
|
||||||
*
|
|
||||||
* @param f The given {@link INodeFileUnderConstruction} instance
|
|
||||||
*/
|
|
||||||
public INodeFileUnderConstructionWithSnapshot(INodeFileUnderConstruction f,
|
|
||||||
final FileDiffList diffs) {
|
|
||||||
this(f, f.getClientName(), f.getClientMachine(), f.getClientNode(), diffs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected INodeFileWithSnapshot toINodeFile(final long mtime) {
|
|
||||||
assertAllBlocksComplete();
|
|
||||||
final long atime = getModificationTime();
|
|
||||||
final INodeFileWithSnapshot f = new INodeFileWithSnapshot(this, getDiffs());
|
|
||||||
f.setModificationTime(mtime);
|
|
||||||
f.setAccessTime(atime);
|
|
||||||
return f;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isCurrentFileDeleted() {
|
|
||||||
return isCurrentFileDeleted;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteCurrentFile() {
|
|
||||||
isCurrentFileDeleted = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public INodeFileAttributes getSnapshotINode(Snapshot snapshot) {
|
|
||||||
return diffs.getSnapshotINode(snapshot, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public INodeFileUnderConstructionWithSnapshot recordModification(
|
|
||||||
final Snapshot latest, final INodeMap inodeMap)
|
|
||||||
throws QuotaExceededException {
|
|
||||||
if (isInLatestSnapshot(latest) && !shouldRecordInSrcSnapshot(latest)) {
|
|
||||||
diffs.saveSelf2Snapshot(latest, this, null);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public INodeFile asINodeFile() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FileDiffList getDiffs() {
|
|
||||||
return diffs;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
|
|
||||||
final BlocksMapUpdateInfo collectedBlocks,
|
|
||||||
final List<INode> removedINodes, final boolean countDiffChange)
|
|
||||||
throws QuotaExceededException {
|
|
||||||
if (snapshot == null) { // delete the current file
|
|
||||||
if (!isCurrentFileDeleted()) {
|
|
||||||
recordModification(prior, null);
|
|
||||||
deleteCurrentFile();
|
|
||||||
}
|
|
||||||
Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
|
|
||||||
return Quota.Counts.newInstance();
|
|
||||||
} else { // delete a snapshot
|
|
||||||
prior = getDiffs().updatePrior(snapshot, prior);
|
|
||||||
return diffs.deleteSnapshotDiff(snapshot, prior, this, collectedBlocks,
|
|
||||||
removedINodes, countDiffChange);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toDetailString() {
|
|
||||||
return super.toDetailString()
|
|
||||||
+ (isCurrentFileDeleted()? " (DELETED), ": ", ") + diffs;
|
|
||||||
}
|
|
||||||
}
|
|
@ -21,7 +21,6 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
|
||||||
@ -47,15 +46,6 @@ public INodeFileWithSnapshot(INodeFile f, FileDiffList diffs) {
|
|||||||
this.diffs = diffs != null? diffs: new FileDiffList();
|
this.diffs = diffs != null? diffs: new FileDiffList();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public INodeFileUnderConstructionWithSnapshot toUnderConstruction(
|
|
||||||
final String clientName,
|
|
||||||
final String clientMachine,
|
|
||||||
final DatanodeDescriptor clientNode) {
|
|
||||||
return new INodeFileUnderConstructionWithSnapshot(this,
|
|
||||||
clientName, clientMachine, clientNode, getDiffs());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isCurrentFileDeleted() {
|
public boolean isCurrentFileDeleted() {
|
||||||
return isCurrentFileDeleted;
|
return isCurrentFileDeleted;
|
||||||
|
@ -82,9 +82,10 @@ static void addFiles(FSEditLog editLog, int numFiles, short replication,
|
|||||||
blocks[iB].setBlockId(currentBlockId++);
|
blocks[iB].setBlockId(currentBlockId++);
|
||||||
}
|
}
|
||||||
|
|
||||||
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
|
final INodeFile inode = new INodeFile(inodeId.nextValue(), null,
|
||||||
inodeId.nextValue(), null, replication, 0, blockSize, blocks, p, "",
|
p, 0L, 0L, blocks, replication, blockSize);
|
||||||
"", null);
|
inode.toUnderConstruction("", "", null);
|
||||||
|
|
||||||
// Append path to filename with information about blockIDs
|
// Append path to filename with information about blockIDs
|
||||||
String path = "_" + iF + "_B" + blocks[0].getBlockId() +
|
String path = "_" + iF + "_B" + blocks[0].getBlockId() +
|
||||||
"_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
|
"_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
|
||||||
@ -96,9 +97,10 @@ static void addFiles(FSEditLog editLog, int numFiles, short replication,
|
|||||||
dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L);
|
dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L);
|
||||||
editLog.logMkDir(currentDir, dirInode);
|
editLog.logMkDir(currentDir, dirInode);
|
||||||
}
|
}
|
||||||
editLog.logOpenFile(filePath,
|
INodeFile fileUc = new INodeFile(inodeId.nextValue(), null,
|
||||||
new INodeFileUnderConstruction(inodeId.nextValue(), p, replication,
|
p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
|
||||||
0, blockSize, "", "", null), false);
|
fileUc.toUnderConstruction("", "", null);
|
||||||
|
editLog.logOpenFile(filePath, fileUc, false);
|
||||||
editLog.logCloseFile(filePath, inode);
|
editLog.logCloseFile(filePath, inode);
|
||||||
|
|
||||||
if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks
|
if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks
|
||||||
|
@ -43,8 +43,7 @@ public class TestCommitBlockSynchronization {
|
|||||||
private static final long length = 200;
|
private static final long length = 200;
|
||||||
private static final long genStamp = 300;
|
private static final long genStamp = 300;
|
||||||
|
|
||||||
private FSNamesystem makeNameSystemSpy(Block block,
|
private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
|
||||||
INodeFileUnderConstruction file)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
FSImage image = new FSImage(conf);
|
FSImage image = new FSImage(conf);
|
||||||
@ -58,21 +57,26 @@ private FSNamesystem makeNameSystemSpy(Block block,
|
|||||||
blockInfo.setGenerationStamp(genStamp);
|
blockInfo.setGenerationStamp(genStamp);
|
||||||
blockInfo.initializeBlockRecovery(genStamp);
|
blockInfo.initializeBlockRecovery(genStamp);
|
||||||
doReturn(true).when(file).removeLastBlock(any(Block.class));
|
doReturn(true).when(file).removeLastBlock(any(Block.class));
|
||||||
|
doReturn(true).when(file).isUnderConstruction();
|
||||||
|
|
||||||
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
|
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
|
||||||
doReturn("").when(namesystemSpy).closeFileCommitBlocks(
|
doReturn("").when(namesystemSpy).closeFileCommitBlocks(
|
||||||
any(INodeFileUnderConstruction.class),
|
any(INodeFile.class), any(BlockInfo.class));
|
||||||
any(BlockInfo.class));
|
|
||||||
doReturn("").when(namesystemSpy).persistBlocks(
|
doReturn("").when(namesystemSpy).persistBlocks(
|
||||||
any(INodeFileUnderConstruction.class), anyBoolean());
|
any(INodeFile.class), anyBoolean());
|
||||||
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
|
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
|
||||||
|
|
||||||
return namesystemSpy;
|
return namesystemSpy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private INodeFile mockFileUnderConstruction() {
|
||||||
|
INodeFile file = mock(INodeFile.class);
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitBlockSynchronization() throws IOException {
|
public void testCommitBlockSynchronization() throws IOException {
|
||||||
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
|
INodeFile file = mockFileUnderConstruction();
|
||||||
Block block = new Block(blockId, length, genStamp);
|
Block block = new Block(blockId, length, genStamp);
|
||||||
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
||||||
DatanodeID[] newTargets = new DatanodeID[0];
|
DatanodeID[] newTargets = new DatanodeID[0];
|
||||||
@ -100,7 +104,7 @@ public void testCommitBlockSynchronization() throws IOException {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitBlockSynchronization2() throws IOException {
|
public void testCommitBlockSynchronization2() throws IOException {
|
||||||
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
|
INodeFile file = mockFileUnderConstruction();
|
||||||
Block block = new Block(blockId, length, genStamp);
|
Block block = new Block(blockId, length, genStamp);
|
||||||
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
||||||
DatanodeID[] newTargets = new DatanodeID[0];
|
DatanodeID[] newTargets = new DatanodeID[0];
|
||||||
@ -124,7 +128,7 @@ public void testCommitBlockSynchronization2() throws IOException {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitBlockSynchronizationWithDelete() throws IOException {
|
public void testCommitBlockSynchronizationWithDelete() throws IOException {
|
||||||
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
|
INodeFile file = mockFileUnderConstruction();
|
||||||
Block block = new Block(blockId, length, genStamp);
|
Block block = new Block(blockId, length, genStamp);
|
||||||
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
||||||
DatanodeID[] newTargets = new DatanodeID[0];
|
DatanodeID[] newTargets = new DatanodeID[0];
|
||||||
@ -144,7 +148,7 @@ public void testCommitBlockSynchronizationWithDelete() throws IOException {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitBlockSynchronizationWithClose() throws IOException {
|
public void testCommitBlockSynchronizationWithClose() throws IOException {
|
||||||
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
|
INodeFile file = mockFileUnderConstruction();
|
||||||
Block block = new Block(blockId, length, genStamp);
|
Block block = new Block(blockId, length, genStamp);
|
||||||
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
||||||
DatanodeID[] newTargets = new DatanodeID[0];
|
DatanodeID[] newTargets = new DatanodeID[0];
|
||||||
@ -171,7 +175,7 @@ public void testCommitBlockSynchronizationWithClose() throws IOException {
|
|||||||
@Test
|
@Test
|
||||||
public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget()
|
public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
|
INodeFile file = mockFileUnderConstruction();
|
||||||
Block block = new Block(blockId, length, genStamp);
|
Block block = new Block(blockId, length, genStamp);
|
||||||
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
|
||||||
DatanodeID[] newTargets = new DatanodeID[]{
|
DatanodeID[] newTargets = new DatanodeID[]{
|
||||||
|
@ -63,6 +63,7 @@
|
|||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||||
@ -152,9 +153,10 @@ public void run() {
|
|||||||
FSEditLog editLog = namesystem.getEditLog();
|
FSEditLog editLog = namesystem.getEditLog();
|
||||||
|
|
||||||
for (int i = 0; i < numTransactions; i++) {
|
for (int i = 0; i < numTransactions; i++) {
|
||||||
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
|
INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
|
||||||
namesystem.allocateNewInodeId(), p, replication, blockSize, 0, "",
|
p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
|
||||||
"", null);
|
inode.toUnderConstruction("", "", null);
|
||||||
|
|
||||||
editLog.logOpenFile("/filename" + (startIndex + i), inode, false);
|
editLog.logOpenFile("/filename" + (startIndex + i), inode, false);
|
||||||
editLog.logCloseFile("/filename" + (startIndex + i), inode);
|
editLog.logCloseFile("/filename" + (startIndex + i), inode);
|
||||||
editLog.logSync();
|
editLog.logSync();
|
||||||
|
@ -29,6 +29,8 @@
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -59,6 +61,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@ -286,14 +289,6 @@ public void testValueOf () throws IOException {
|
|||||||
assertTrue(fnfe.getMessage().contains("File does not exist"));
|
assertTrue(fnfe.getMessage().contains("File does not exist"));
|
||||||
}
|
}
|
||||||
|
|
||||||
//cast to INodeFileUnderConstruction, should fail
|
|
||||||
try {
|
|
||||||
INodeFileUnderConstruction.valueOf(from, path);
|
|
||||||
fail();
|
|
||||||
} catch(FileNotFoundException fnfe) {
|
|
||||||
assertTrue(fnfe.getMessage().contains("File does not exist"));
|
|
||||||
}
|
|
||||||
|
|
||||||
//cast to INodeDirectory, should fail
|
//cast to INodeDirectory, should fail
|
||||||
try {
|
try {
|
||||||
INodeDirectory.valueOf(from, path);
|
INodeDirectory.valueOf(from, path);
|
||||||
@ -310,14 +305,6 @@ public void testValueOf () throws IOException {
|
|||||||
final INodeFile f = INodeFile.valueOf(from, path);
|
final INodeFile f = INodeFile.valueOf(from, path);
|
||||||
assertTrue(f == from);
|
assertTrue(f == from);
|
||||||
|
|
||||||
//cast to INodeFileUnderConstruction, should fail
|
|
||||||
try {
|
|
||||||
INodeFileUnderConstruction.valueOf(from, path);
|
|
||||||
fail();
|
|
||||||
} catch(IOException ioe) {
|
|
||||||
assertTrue(ioe.getMessage().contains("File is not under construction"));
|
|
||||||
}
|
|
||||||
|
|
||||||
//cast to INodeDirectory, should fail
|
//cast to INodeDirectory, should fail
|
||||||
try {
|
try {
|
||||||
INodeDirectory.valueOf(from, path);
|
INodeDirectory.valueOf(from, path);
|
||||||
@ -328,19 +315,14 @@ public void testValueOf () throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{//cast from INodeFileUnderConstruction
|
{//cast from INodeFileUnderConstruction
|
||||||
final INode from = new INodeFileUnderConstruction(
|
final INode from = new INodeFile(
|
||||||
INodeId.GRANDFATHER_INODE_ID, perm, replication, 0L, 0L, "client",
|
INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, replication, 1024L);
|
||||||
"machine", null);
|
from.asFile().toUnderConstruction("client", "machine", null);
|
||||||
|
|
||||||
//cast to INodeFile, should success
|
//cast to INodeFile, should success
|
||||||
final INodeFile f = INodeFile.valueOf(from, path);
|
final INodeFile f = INodeFile.valueOf(from, path);
|
||||||
assertTrue(f == from);
|
assertTrue(f == from);
|
||||||
|
|
||||||
//cast to INodeFileUnderConstruction, should success
|
|
||||||
final INodeFileUnderConstruction u = INodeFileUnderConstruction.valueOf(
|
|
||||||
from, path);
|
|
||||||
assertTrue(u == from);
|
|
||||||
|
|
||||||
//cast to INodeDirectory, should fail
|
//cast to INodeDirectory, should fail
|
||||||
try {
|
try {
|
||||||
INodeDirectory.valueOf(from, path);
|
INodeDirectory.valueOf(from, path);
|
||||||
@ -362,14 +344,6 @@ public void testValueOf () throws IOException {
|
|||||||
assertTrue(fnfe.getMessage().contains("Path is not a file"));
|
assertTrue(fnfe.getMessage().contains("Path is not a file"));
|
||||||
}
|
}
|
||||||
|
|
||||||
//cast to INodeFileUnderConstruction, should fail
|
|
||||||
try {
|
|
||||||
INodeFileUnderConstruction.valueOf(from, path);
|
|
||||||
fail();
|
|
||||||
} catch(FileNotFoundException fnfe) {
|
|
||||||
assertTrue(fnfe.getMessage().contains("Path is not a file"));
|
|
||||||
}
|
|
||||||
|
|
||||||
//cast to INodeDirectory, should success
|
//cast to INodeDirectory, should success
|
||||||
final INodeDirectory d = INodeDirectory.valueOf(from, path);
|
final INodeDirectory d = INodeDirectory.valueOf(from, path);
|
||||||
assertTrue(d == from);
|
assertTrue(d == from);
|
||||||
@ -1047,4 +1021,24 @@ public void testFilesInGetListingOps() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileUnderConstruction() {
|
||||||
|
replication = 3;
|
||||||
|
final INodeFile file = new INodeFile(INodeId.GRANDFATHER_INODE_ID, null,
|
||||||
|
perm, 0L, 0L, null, replication, 1024L);
|
||||||
|
assertFalse(file.isUnderConstruction());
|
||||||
|
|
||||||
|
final String clientName = "client";
|
||||||
|
final String clientMachine = "machine";
|
||||||
|
file.toUnderConstruction(clientName, clientMachine, null);
|
||||||
|
assertTrue(file.isUnderConstruction());
|
||||||
|
FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
|
||||||
|
assertEquals(clientName, uc.getClientName());
|
||||||
|
assertEquals(clientMachine, uc.getClientMachine());
|
||||||
|
Assert.assertNull(uc.getClientNode());
|
||||||
|
|
||||||
|
file.toCompleteFile(Time.now());
|
||||||
|
assertFalse(file.isUnderConstruction());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
@ -726,8 +725,8 @@ void invoke() throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
boolean checkNamenodeBeforeReturn() throws Exception {
|
boolean checkNamenodeBeforeReturn() throws Exception {
|
||||||
INodeFileUnderConstruction fileNode = (INodeFileUnderConstruction) cluster
|
INodeFile fileNode = cluster.getNamesystem(0).getFSDirectory()
|
||||||
.getNamesystem(0).getFSDirectory().getINode4Write(file).asFile();
|
.getINode4Write(file).asFile();
|
||||||
BlockInfoUnderConstruction blkUC =
|
BlockInfoUnderConstruction blkUC =
|
||||||
(BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
|
(BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
|
||||||
int datanodeNum = blkUC.getExpectedStorageLocations().length;
|
int datanodeNum = blkUC.getExpectedStorageLocations().length;
|
||||||
|
@ -1227,8 +1227,9 @@ public void testRenameAndAppend() throws Exception {
|
|||||||
out.write(content);
|
out.write(content);
|
||||||
fooRef = fsdir.getINode4Write(foo2.toString());
|
fooRef = fsdir.getINode4Write(foo2.toString());
|
||||||
assertTrue(fooRef instanceof INodeReference.DstReference);
|
assertTrue(fooRef instanceof INodeReference.DstReference);
|
||||||
INode fooNode = fooRef.asFile();
|
INodeFile fooNode = fooRef.asFile();
|
||||||
assertTrue(fooNode instanceof INodeFileUnderConstructionWithSnapshot);
|
assertTrue(fooNode instanceof INodeFileWithSnapshot);
|
||||||
|
assertTrue(fooNode.isUnderConstruction());
|
||||||
} finally {
|
} finally {
|
||||||
if (out != null) {
|
if (out != null) {
|
||||||
out.close();
|
out.close();
|
||||||
@ -1237,8 +1238,9 @@ public void testRenameAndAppend() throws Exception {
|
|||||||
|
|
||||||
fooRef = fsdir.getINode4Write(foo2.toString());
|
fooRef = fsdir.getINode4Write(foo2.toString());
|
||||||
assertTrue(fooRef instanceof INodeReference.DstReference);
|
assertTrue(fooRef instanceof INodeReference.DstReference);
|
||||||
INode fooNode = fooRef.asFile();
|
INodeFile fooNode = fooRef.asFile();
|
||||||
assertTrue(fooNode instanceof INodeFileWithSnapshot);
|
assertTrue(fooNode instanceof INodeFileWithSnapshot);
|
||||||
|
assertFalse(fooNode.isUnderConstruction());
|
||||||
|
|
||||||
restartClusterAndCheckImage(true);
|
restartClusterAndCheckImage(true);
|
||||||
}
|
}
|
||||||
|
@ -314,7 +314,9 @@ public void testDeletionWithZeroSizeBlock() throws Exception {
|
|||||||
assertEquals(BLOCKSIZE, blks[0].getNumBytes());
|
assertEquals(BLOCKSIZE, blks[0].getNumBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Make sure we delete 0-sized block when deleting an INodeFileUC */
|
/**
|
||||||
|
* Make sure we delete 0-sized block when deleting an under-construction file
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDeletionWithZeroSizeBlock2() throws Exception {
|
public void testDeletionWithZeroSizeBlock2() throws Exception {
|
||||||
final Path foo = new Path("/foo");
|
final Path foo = new Path("/foo");
|
||||||
|
Loading…
x
Reference in New Issue
Block a user