HDFS-2718. Optimize OP_ADD in edits loading. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1239769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Shvachko 2012-02-02 19:12:42 +00:00
parent 12f83b380a
commit 4c78384ecb
8 changed files with 193 additions and 57 deletions

View File

@ -1544,6 +1544,8 @@ Release 0.22.1 - Unreleased
OPTIMIZATIONS
HDFS-2718. Optimize OP_ADD in edits loading. (shv)
BUG FIXES
Release 0.22.0 - 2011-11-29

View File

@ -158,9 +158,6 @@ public class BlockInfoUnderConstruction extends BlockInfo {
BlockInfo convertToCompleteBlock() throws IOException {
assert getBlockUCState() != BlockUCState.COMPLETE :
"Trying to convert a COMPLETE block";
if(getBlockUCState() != BlockUCState.COMMITTED)
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
return new BlockInfo(this);
}

View File

@ -439,15 +439,23 @@ public class BlockManager {
*/
private BlockInfo completeBlock(final INodeFile fileINode,
final int blkIndex) throws IOException {
return completeBlock(fileINode, blkIndex, false);
}
public BlockInfo completeBlock(final INodeFile fileINode,
final int blkIndex, final boolean force) throws IOException {
if(blkIndex < 0)
return null;
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
if(curBlock.isComplete())
return curBlock;
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
if(ucBlock.numNodes() < minReplication)
if(!force && ucBlock.numNodes() < minReplication)
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
// replace penultimate block in file
fileINode.setBlock(blkIndex, completeBlock);

View File

@ -287,22 +287,28 @@ public class FSDirectory implements Closeable {
short replication,
long modificationTime,
long atime,
long preferredBlockSize)
long preferredBlockSize,
String clientName,
String clientMachine)
throws UnresolvedLinkException {
INode newNode;
long diskspace = UNKNOWN_DISK_SPACE;
assert hasWriteLock();
if (blocks == null)
newNode = new INodeDirectory(permissions, modificationTime);
else {
else if(blocks.length == 0 || blocks[blocks.length-1].getBlockUCState()
== BlockUCState.UNDER_CONSTRUCTION) {
newNode = new INodeFileUnderConstruction(
permissions, blocks.length, replication,
preferredBlockSize, modificationTime, clientName,
clientMachine, null);
} else {
newNode = new INodeFile(permissions, blocks.length, replication,
modificationTime, atime, preferredBlockSize);
diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
}
writeLock();
try {
try {
newNode = addNode(path, newNode, diskspace);
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
if(newNode != null && blocks != null) {
int nrBlocks = blocks.length;
// Add file->block mapping
@ -321,6 +327,74 @@ public class FSDirectory implements Closeable {
}
/**
* Update files in-memory data structures with new block information.
* @throws IOException
*/
void updateFile(INodeFile file,
String path,
PermissionStatus permissions,
BlockInfo[] blocks,
short replication,
long mtime,
long atime,
long preferredBlockSize) throws IOException {
// Update the salient file attributes.
file.setAccessTime(atime);
file.setModificationTimeForce(mtime);
// Update its block list
BlockInfo[] oldBlocks = file.getBlocks();
// Are we only updating the last block's gen stamp.
boolean isGenStampUpdate = oldBlocks.length == blocks.length;
// First, update blocks in common
BlockInfo oldBlock = null;
for (int i = 0; i < oldBlocks.length && i < blocks.length; i++) {
oldBlock = oldBlocks[i];
Block newBlock = blocks[i];
boolean isLastBlock = i == oldBlocks.length - 1;
if (oldBlock.getBlockId() != newBlock.getBlockId() ||
(oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
!(isGenStampUpdate && isLastBlock))) {
throw new IOException("Mismatched block IDs or generation stamps, " +
"attempting to replace block " + oldBlock + " with " + newBlock +
" as block # " + i + "/" + blocks.length + " of " + path);
}
oldBlock.setNumBytes(newBlock.getNumBytes());
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
}
if (blocks.length < oldBlocks.length) {
// We're removing a block from the file, e.g. abandonBlock(...)
if (!file.isUnderConstruction()) {
throw new IOException("Trying to remove a block from file " +
path + " which is not under construction.");
}
if (blocks.length != oldBlocks.length - 1) {
throw new IOException("Trying to remove more than one block from file "
+ path);
}
unprotectedRemoveBlock(path,
(INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
} else if (blocks.length > oldBlocks.length) {
// We're adding blocks
// First complete last old Block
getBlockManager().completeBlock(file, oldBlocks.length-1, true);
// Add the new blocks
for (int i = oldBlocks.length; i < blocks.length; i++) {
// addBlock();
BlockInfo newBI = blocks[i];
getBlockManager().addINode(newBI, file);
file.addBlock(newBI);
}
}
}
INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
// NOTE: This does not update space counts for parents
@ -442,12 +516,21 @@ public class FSDirectory implements Closeable {
writeLock();
try {
unprotectedRemoveBlock(path, fileNode, block);
// write modified block locations to log
fsImage.getEditLog().logOpenFile(path, fileNode);
} finally {
writeUnlock();
}
return true;
}
void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode,
Block block) throws IOException {
// modify file-> block and blocksMap
fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
// write modified block locations to log
fsImage.getEditLog().logOpenFile(path, fileNode);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
@ -458,10 +541,6 @@ public class FSDirectory implements Closeable {
INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length-1, 0,
-fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
} finally {
writeUnlock();
}
return true;
}
/**

View File

@ -187,31 +187,53 @@ public class FSEditLogLoader {
" clientMachine " + addCloseOp.clientMachine);
}
fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
// There are four cases here:
// 1. OP_ADD to create a new file
// 2. OP_ADD to update file blocks
// 3. OP_ADD to open file for append
// 4. OP_CLOSE to close the file
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
addCloseOp.path, permissions,
blocks, replication,
// See if the file already exists
INodeFile oldFile = fsDir.getFileINode(addCloseOp.path);
if (oldFile == null) { // OP_ADD for a new file
assert addCloseOp.opCode == FSEditLogOpCodes.OP_ADD :
"Expected opcode OP_ADD, but got " + addCloseOp.opCode;
fsDir.unprotectedAddFile(
addCloseOp.path, permissions, blocks, replication,
addCloseOp.mtime, addCloseOp.atime, blockSize,
addCloseOp.clientName, addCloseOp.clientMachine);
} else {
fsDir.updateFile(oldFile,
addCloseOp.path, permissions, blocks, replication,
addCloseOp.mtime, addCloseOp.atime, blockSize);
if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
//
// Replace current node with a INodeUnderConstruction.
// Recreate in-memory lease record.
//
if(addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE) { // OP_CLOSE
assert oldFile.isUnderConstruction() :
"File is not under construction: " + addCloseOp.path;
fsNamesys.getBlockManager().completeBlock(
oldFile, blocks.length-1, true);
INodeFile newFile =
((INodeFileUnderConstruction)oldFile).convertToInodeFile();
fsDir.replaceNode(addCloseOp.path, oldFile, newFile);
} else if(! oldFile.isUnderConstruction()) { // OP_ADD for append
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
oldFile.getLocalNameBytes(),
oldFile.getReplication(),
oldFile.getModificationTime(),
oldFile.getPreferredBlockSize(),
oldFile.getBlocks(),
oldFile.getPermissionStatus(),
addCloseOp.clientName,
addCloseOp.clientMachine,
null);
fsDir.replaceNode(addCloseOp.path, node, cons);
fsNamesys.leaseManager.addLease(cons.getClientName(),
addCloseOp.path);
fsDir.replaceNode(addCloseOp.path, oldFile, cons);
}
}
// Update file lease
if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
} else { // Ops.OP_CLOSE
fsNamesys.leaseManager.removeLease(
((INodeFileUnderConstruction)oldFile).getClientName(), addCloseOp.path);
}
break;
}

View File

@ -41,8 +41,20 @@ public class INodeFileUnderConstruction extends INodeFile {
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
super(permissions.applyUMask(UMASK), 0, replication, modTime, modTime,
preferredBlockSize);
this(permissions, 0, replication, preferredBlockSize, modTime,
clientName, clientMachine, clientNode);
}
INodeFileUnderConstruction(PermissionStatus permissions,
int nrBlocks,
short replication,
long preferredBlockSize,
long modTime,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
super(permissions.applyUMask(UMASK), nrBlocks, replication,
modTime, modTime, preferredBlockSize);
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;

View File

@ -72,12 +72,20 @@ public class TestAbandonBlock {
// Now abandon the last block
DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1);
LocatedBlocks blocks =
dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
int orginalNumBlocks = blocks.locatedBlockCount();
LocatedBlock b = blocks.getLastLocatedBlock();
dfsclient.getNamenode().abandonBlock(b.getBlock(), src, dfsclient.clientName);
// And close the file
fout.close();
// Close cluster and check the block has been abandoned after restart
cluster.restartNameNode();
blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
assert orginalNumBlocks == blocks.locatedBlockCount() + 1 :
"Blocks " + b + " has not been abandoned.";
}
@Test

View File

@ -111,10 +111,12 @@ public class TestEditLog extends TestCase {
int numTransactions;
short replication = 3;
long blockSize = 64;
int startIndex;
Transactions(FSNamesystem ns, int num) {
Transactions(FSNamesystem ns, int numTx, int startIdx) {
namesystem = ns;
numTransactions = num;
numTransactions = numTx;
startIndex = startIdx;
}
// add a bunch of transactions.
@ -126,8 +128,8 @@ public class TestEditLog extends TestCase {
for (int i = 0; i < numTransactions; i++) {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
p, replication, blockSize, 0, "", "", null);
editLog.logOpenFile("/filename" + i, inode);
editLog.logCloseFile("/filename" + i, inode);
editLog.logOpenFile("/filename" + startIndex + i, inode);
editLog.logCloseFile("/filename" + startIndex + i, inode);
editLog.logSync();
}
}
@ -275,7 +277,8 @@ public class TestEditLog extends TestCase {
// Create threads and make them run transactions concurrently.
Thread threadId[] = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
Transactions trans =
new Transactions(namesystem, NUM_TRANSACTIONS, i*NUM_TRANSACTIONS);
threadId[i] = new Thread(trans, "TransactionThread-" + i);
threadId[i].start();
}
@ -289,10 +292,15 @@ public class TestEditLog extends TestCase {
}
}
// Reopen some files as for append
Transactions trans =
new Transactions(namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
trans.run();
// Roll another time to finalize edits_inprogress_3
fsimage.rollEditLog();
long expectedTxns = (NUM_THREADS * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
long expectedTxns = ((NUM_THREADS+1) * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
// Verify that we can read in all the transactions that we have written.
// If there were any corruptions, it is likely that the reading in