diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7c6d93cb230..0364a347ed4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1544,6 +1544,8 @@ Release 0.22.1 - Unreleased OPTIMIZATIONS + HDFS-2718. Optimize OP_ADD in edits loading. (shv) + BUG FIXES Release 0.22.0 - 2011-11-29 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index 29565ace47d..1695cabb521 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -158,9 +158,6 @@ public class BlockInfoUnderConstruction extends BlockInfo { BlockInfo convertToCompleteBlock() throws IOException { assert getBlockUCState() != BlockUCState.COMPLETE : "Trying to convert a COMPLETE block"; - if(getBlockUCState() != BlockUCState.COMMITTED) - throw new IOException( - "Cannot complete block: block has not been COMMITTED by the client"); return new BlockInfo(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index fb045b7195f..19968f75fd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -439,15 +439,23 @@ public class BlockManager { */ private BlockInfo completeBlock(final INodeFile fileINode, final int blkIndex) throws IOException { + return completeBlock(fileINode, blkIndex, false); + } + + public BlockInfo completeBlock(final INodeFile fileINode, + final int blkIndex, final boolean force) throws IOException { if(blkIndex < 0) return null; BlockInfo curBlock = fileINode.getBlocks()[blkIndex]; if(curBlock.isComplete()) return curBlock; BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock; - if(ucBlock.numNodes() < minReplication) + if(!force && ucBlock.numNodes() < minReplication) throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); + if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) + throw new IOException( + "Cannot complete block: block has not been COMMITTED by the client"); BlockInfo completeBlock = ucBlock.convertToCompleteBlock(); // replace penultimate block in file fileINode.setBlock(blkIndex, completeBlock); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 93684b627b2..b956ce0b2ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -287,22 +287,28 @@ public class FSDirectory implements Closeable { short replication, long modificationTime, long atime, - long preferredBlockSize) + long preferredBlockSize, + String clientName, + String clientMachine) throws UnresolvedLinkException { INode newNode; - long diskspace = UNKNOWN_DISK_SPACE; assert hasWriteLock(); if (blocks == null) newNode = new INodeDirectory(permissions, modificationTime); - else { + else if(blocks.length == 0 || blocks[blocks.length-1].getBlockUCState() + == BlockUCState.UNDER_CONSTRUCTION) { + newNode = new INodeFileUnderConstruction( + permissions, blocks.length, replication, + preferredBlockSize, modificationTime, clientName, + clientMachine, null); + } else { newNode = new INodeFile(permissions, blocks.length, replication, modificationTime, atime, preferredBlockSize); - diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks); } writeLock(); try { try { - newNode = addNode(path, newNode, diskspace); + newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE); if(newNode != null && blocks != null) { int nrBlocks = blocks.length; // Add file->block mapping @@ -321,6 +327,74 @@ public class FSDirectory implements Closeable { } + /** + * Update files in-memory data structures with new block information. + * @throws IOException + */ + void updateFile(INodeFile file, + String path, + PermissionStatus permissions, + BlockInfo[] blocks, + short replication, + long mtime, + long atime, + long preferredBlockSize) throws IOException { + + // Update the salient file attributes. + file.setAccessTime(atime); + file.setModificationTimeForce(mtime); + + // Update its block list + BlockInfo[] oldBlocks = file.getBlocks(); + + // Are we only updating the last block's gen stamp. + boolean isGenStampUpdate = oldBlocks.length == blocks.length; + + // First, update blocks in common + BlockInfo oldBlock = null; + for (int i = 0; i < oldBlocks.length && i < blocks.length; i++) { + oldBlock = oldBlocks[i]; + Block newBlock = blocks[i]; + + boolean isLastBlock = i == oldBlocks.length - 1; + if (oldBlock.getBlockId() != newBlock.getBlockId() || + (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && + !(isGenStampUpdate && isLastBlock))) { + throw new IOException("Mismatched block IDs or generation stamps, " + + "attempting to replace block " + oldBlock + " with " + newBlock + + " as block # " + i + "/" + blocks.length + " of " + path); + } + + oldBlock.setNumBytes(newBlock.getNumBytes()); + oldBlock.setGenerationStamp(newBlock.getGenerationStamp()); + } + + if (blocks.length < oldBlocks.length) { + // We're removing a block from the file, e.g. abandonBlock(...) + if (!file.isUnderConstruction()) { + throw new IOException("Trying to remove a block from file " + + path + " which is not under construction."); + } + if (blocks.length != oldBlocks.length - 1) { + throw new IOException("Trying to remove more than one block from file " + + path); + } + unprotectedRemoveBlock(path, + (INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]); + } else if (blocks.length > oldBlocks.length) { + // We're adding blocks + // First complete last old Block + getBlockManager().completeBlock(file, oldBlocks.length-1, true); + // Add the new blocks + for (int i = oldBlocks.length; i < blocks.length; i++) { + // addBlock(); + BlockInfo newBI = blocks[i]; + getBlockManager().addINode(newBI, file); + file.addBlock(newBI); + } + } + } + INodeDirectory addToParent(byte[] src, INodeDirectory parentINode, INode newNode, boolean propagateModTime) throws UnresolvedLinkException { // NOTE: This does not update space counts for parents @@ -442,28 +516,33 @@ public class FSDirectory implements Closeable { writeLock(); try { - // modify file-> block and blocksMap - fileNode.removeLastBlock(block); - getBlockManager().removeBlockFromMap(block); - + unprotectedRemoveBlock(path, fileNode, block); // write modified block locations to log fsImage.getEditLog().logOpenFile(path, fileNode); - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: " - +path+" with "+block - +" block is removed from the file system"); - } - - // update space consumed - INode[] pathINodes = getExistingPathINodes(path); - updateCount(pathINodes, pathINodes.length-1, 0, - -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true); } finally { writeUnlock(); } return true; } + void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode, + Block block) throws IOException { + // modify file-> block and blocksMap + fileNode.removeLastBlock(block); + getBlockManager().removeBlockFromMap(block); + + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: " + +path+" with "+block + +" block is removed from the file system"); + } + + // update space consumed + INode[] pathINodes = getExistingPathINodes(path); + updateCount(pathINodes, pathINodes.length-1, 0, + -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true); + } + /** * @see #unprotectedRenameTo(String, String, long) * @deprecated Use {@link #renameTo(String, String, Rename...)} instead. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 51865c82de1..07bc62f5be0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -187,31 +187,53 @@ public class FSEditLogLoader { " clientMachine " + addCloseOp.clientMachine); } - fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime); + // There are four cases here: + // 1. OP_ADD to create a new file + // 2. OP_ADD to update file blocks + // 3. OP_ADD to open file for append + // 4. OP_CLOSE to close the file - // add to the file tree - INodeFile node = (INodeFile)fsDir.unprotectedAddFile( - addCloseOp.path, permissions, - blocks, replication, - addCloseOp.mtime, addCloseOp.atime, blockSize); - if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) { - // - // Replace current node with a INodeUnderConstruction. - // Recreate in-memory lease record. - // - INodeFileUnderConstruction cons = new INodeFileUnderConstruction( - node.getLocalNameBytes(), - node.getReplication(), - node.getModificationTime(), - node.getPreferredBlockSize(), - node.getBlocks(), - node.getPermissionStatus(), - addCloseOp.clientName, - addCloseOp.clientMachine, - null); - fsDir.replaceNode(addCloseOp.path, node, cons); - fsNamesys.leaseManager.addLease(cons.getClientName(), - addCloseOp.path); + // See if the file already exists + INodeFile oldFile = fsDir.getFileINode(addCloseOp.path); + if (oldFile == null) { // OP_ADD for a new file + assert addCloseOp.opCode == FSEditLogOpCodes.OP_ADD : + "Expected opcode OP_ADD, but got " + addCloseOp.opCode; + fsDir.unprotectedAddFile( + addCloseOp.path, permissions, blocks, replication, + addCloseOp.mtime, addCloseOp.atime, blockSize, + addCloseOp.clientName, addCloseOp.clientMachine); + } else { + fsDir.updateFile(oldFile, + addCloseOp.path, permissions, blocks, replication, + addCloseOp.mtime, addCloseOp.atime, blockSize); + if(addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE) { // OP_CLOSE + assert oldFile.isUnderConstruction() : + "File is not under construction: " + addCloseOp.path; + fsNamesys.getBlockManager().completeBlock( + oldFile, blocks.length-1, true); + INodeFile newFile = + ((INodeFileUnderConstruction)oldFile).convertToInodeFile(); + fsDir.replaceNode(addCloseOp.path, oldFile, newFile); + } else if(! oldFile.isUnderConstruction()) { // OP_ADD for append + INodeFileUnderConstruction cons = new INodeFileUnderConstruction( + oldFile.getLocalNameBytes(), + oldFile.getReplication(), + oldFile.getModificationTime(), + oldFile.getPreferredBlockSize(), + oldFile.getBlocks(), + oldFile.getPermissionStatus(), + addCloseOp.clientName, + addCloseOp.clientMachine, + null); + fsDir.replaceNode(addCloseOp.path, oldFile, cons); + } + } + // Update file lease + if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) { + fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path); + } else { // Ops.OP_CLOSE + fsNamesys.leaseManager.removeLease( + ((INodeFileUnderConstruction)oldFile).getClientName(), addCloseOp.path); } break; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java index 2440c4dd122..0fab53c95ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java @@ -41,8 +41,20 @@ public class INodeFileUnderConstruction extends INodeFile { String clientName, String clientMachine, DatanodeDescriptor clientNode) { - super(permissions.applyUMask(UMASK), 0, replication, modTime, modTime, - preferredBlockSize); + this(permissions, 0, replication, preferredBlockSize, modTime, + clientName, clientMachine, clientNode); + } + + INodeFileUnderConstruction(PermissionStatus permissions, + int nrBlocks, + short replication, + long preferredBlockSize, + long modTime, + String clientName, + String clientMachine, + DatanodeDescriptor clientNode) { + super(permissions.applyUMask(UMASK), nrBlocks, replication, + modTime, modTime, preferredBlockSize); this.clientName = clientName; this.clientMachine = clientMachine; this.clientNode = clientNode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java index 1613e82ca2d..582767c8010 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java @@ -72,12 +72,20 @@ public class TestAbandonBlock { // Now abandon the last block DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs); - LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1); + LocatedBlocks blocks = + dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE); + int orginalNumBlocks = blocks.locatedBlockCount(); LocatedBlock b = blocks.getLastLocatedBlock(); dfsclient.getNamenode().abandonBlock(b.getBlock(), src, dfsclient.clientName); // And close the file fout.close(); + + // Close cluster and check the block has been abandoned after restart + cluster.restartNameNode(); + blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE); + assert orginalNumBlocks == blocks.locatedBlockCount() + 1 : + "Blocks " + b + " has not been abandoned."; } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 156f8415bae..1b01a2fb0f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -111,10 +111,12 @@ public class TestEditLog extends TestCase { int numTransactions; short replication = 3; long blockSize = 64; + int startIndex; - Transactions(FSNamesystem ns, int num) { + Transactions(FSNamesystem ns, int numTx, int startIdx) { namesystem = ns; - numTransactions = num; + numTransactions = numTx; + startIndex = startIdx; } // add a bunch of transactions. @@ -126,8 +128,8 @@ public class TestEditLog extends TestCase { for (int i = 0; i < numTransactions; i++) { INodeFileUnderConstruction inode = new INodeFileUnderConstruction( p, replication, blockSize, 0, "", "", null); - editLog.logOpenFile("/filename" + i, inode); - editLog.logCloseFile("/filename" + i, inode); + editLog.logOpenFile("/filename" + startIndex + i, inode); + editLog.logCloseFile("/filename" + startIndex + i, inode); editLog.logSync(); } } @@ -275,7 +277,8 @@ public class TestEditLog extends TestCase { // Create threads and make them run transactions concurrently. Thread threadId[] = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { - Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS); + Transactions trans = + new Transactions(namesystem, NUM_TRANSACTIONS, i*NUM_TRANSACTIONS); threadId[i] = new Thread(trans, "TransactionThread-" + i); threadId[i].start(); } @@ -288,11 +291,16 @@ public class TestEditLog extends TestCase { i--; // retry } } - + + // Reopen some files as for append + Transactions trans = + new Transactions(namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2); + trans.run(); + // Roll another time to finalize edits_inprogress_3 fsimage.rollEditLog(); - long expectedTxns = (NUM_THREADS * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns + long expectedTxns = ((NUM_THREADS+1) * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns // Verify that we can read in all the transactions that we have written. // If there were any corruptions, it is likely that the reading in