From e964bbc856bbf272e1ae024750001a470c082c2c Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Thu, 9 Apr 2015 22:00:20 -0700 Subject: [PATCH] HDFS-8081. Split getAdditionalBlock() into two methods. Contributed by Konstantin Shvachko --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/namenode/FSNamesystem.java | 46 +++++++++-- .../server/namenode/TestAddBlockRetry.java | 79 ++++++------------- 3 files changed, 65 insertions(+), 62 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7b540cc5b0a..1df106a13ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -8,6 +8,8 @@ Release 2.7.1 - UNRELEASED IMPROVEMENTS + HDFS-8081. Split getAdditionalBlock() into two methods. (shv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f3ae849a985..333e1524ef7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3009,6 +3009,31 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, ExtendedBlock previous, Set excludedNodes, List favoredNodes) throws IOException { + LocatedBlock[] onRetryBlock = new LocatedBlock[1]; + DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId, + clientName, previous, excludedNodes, favoredNodes, onRetryBlock); + if (targets == null) { + assert onRetryBlock[0] != null : "Retry block is null"; + // This is a retry. Just return the last block. + return onRetryBlock[0]; + } + LocatedBlock newBlock = storeAllocatedBlock( + src, fileId, clientName, previous, targets); + return newBlock; + } + + /** + * Part I of getAdditionalBlock(). + * Analyze the state of the file under read lock to determine if the client + * can add a new block, detect potential retries, lease mismatches, + * and minimal replication of the penultimate block. + * + * Generate target DataNode locations for the new block, + * but do not create the new block yet. + */ + DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId, + String clientName, ExtendedBlock previous, Set excludedNodes, + List favoredNodes, LocatedBlock[] onRetryBlock) throws IOException { final long blockSize; final int replication; final byte storagePolicyID; @@ -3020,7 +3045,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, + src + " inodeId " + fileId + " for " + clientName); } - // Part I. Analyze the state of the file with respect to the input data. checkOperation(OperationCategory.READ); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); FSPermissionChecker pc = getPermissionChecker(); @@ -3028,7 +3052,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, try { checkOperation(OperationCategory.READ); src = dir.resolvePath(pc, src, pathComponents); - LocatedBlock[] onRetryBlock = new LocatedBlock[1]; FileState fileState = analyzeFileState( src, fileId, clientName, previous, onRetryBlock); final INodeFile pendingFile = fileState.inode; @@ -3039,8 +3062,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, src = fileState.path; if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { - // This is a retry. Just return the last block if having locations. - return onRetryBlock[0]; + // This is a retry. No need to generate new locations. + // Use the last block if it has locations. + return null; } if (pendingFile.getBlocks().length >= maxBlocksPerFile) { throw new IOException("File has reached the limit on maximum number of" @@ -3064,12 +3088,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } // choose targets for the new block to be allocated. - final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock( + return getBlockManager().chooseTarget4NewBlock( src, replication, clientNode, excludedNodes, blockSize, favoredNodes, storagePolicyID); + } - // Part II. - // Allocate a new block, add it to the INode and the BlocksMap. + /** + * Part II of getAdditionalBlock(). + * Should repeat the same analysis of the file state as in Part 1, + * but under the write lock. + * If the conditions still hold, then allocate a new block with + * the new targets, add it to the INode and to the BlocksMap. + */ + LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName, + ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException { Block newBlock = null; long offset; checkOperation(OperationCategory.WRITE); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index 671f61d1b47..d6d2b5e8258 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -20,16 +20,10 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; - import java.io.IOException; -import java.lang.reflect.Field; import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,18 +32,12 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.net.Node; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; /** * Race between two threads simultaneously calling @@ -63,10 +51,6 @@ public class TestAddBlockRetry { private Configuration conf; private MiniDFSCluster cluster; - private int count = 0; - private LocatedBlock lb1; - private LocatedBlock lb2; - @Before public void setUp() throws Exception { conf = new Configuration(); @@ -92,43 +76,8 @@ public class TestAddBlockRetry { final String src = "/testRetryAddBlockWhileInChooseTarget"; final FSNamesystem ns = cluster.getNamesystem(); - BlockManager spyBM = spy(ns.getBlockManager()); final NamenodeProtocols nn = cluster.getNameNodeRpc(); - // substitute mocked BlockManager into FSNamesystem - Class nsClass = ns.getClass(); - Field bmField = nsClass.getDeclaredField("blockManager"); - bmField.setAccessible(true); - bmField.set(ns, spyBM); - - doAnswer(new Answer() { - @Override - public DatanodeStorageInfo[] answer(InvocationOnMock invocation) - throws Throwable { - LOG.info("chooseTarget for " + src); - DatanodeStorageInfo[] ret = - (DatanodeStorageInfo[]) invocation.callRealMethod(); - assertTrue("Penultimate block must be complete", - checkFileProgress(src, false)); - count++; - if(count == 1) { // run second addBlock() - LOG.info("Starting second addBlock for " + src); - nn.addBlock(src, "clientName", null, null, - INodeId.GRANDFATHER_INODE_ID, null); - assertTrue("Penultimate block must be complete", - checkFileProgress(src, false)); - LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); - assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); - lb2 = lbs.get(0); - assertEquals("Wrong replication", - REPLICATION, lb2.getLocations().length); - } - return ret; - } - }).when(spyBM).chooseTarget4NewBlock(Mockito.anyString(), Mockito.anyInt(), - Mockito.any(), Mockito.>any(), - Mockito.anyLong(), Mockito.>any(), Mockito.anyByte()); - // create file nn.create(src, FsPermission.getFileDefault(), "clientName", @@ -137,12 +86,32 @@ public class TestAddBlockRetry { // start first addBlock() LOG.info("Starting first addBlock for " + src); - nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); + LocatedBlock[] onRetryBlock = new LocatedBlock[1]; + DatanodeStorageInfo targets[] = ns.getNewBlockTargets( + src, INodeId.GRANDFATHER_INODE_ID, "clientName", + null, null, null, onRetryBlock); + assertNotNull("Targets must be generated", targets); - // check locations + // run second addBlock() + LOG.info("Starting second addBlock for " + src); + nn.addBlock(src, "clientName", null, null, + INodeId.GRANDFATHER_INODE_ID, null); + assertTrue("Penultimate block must be complete", + checkFileProgress(src, false)); LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); - lb1 = lbs.get(0); + LocatedBlock lb2 = lbs.get(0); + assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length); + + // continue first addBlock() + LocatedBlock newBlock = ns.storeAllocatedBlock( + src, INodeId.GRANDFATHER_INODE_ID, "clientName", null, targets); + assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock()); + + // check locations + lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); + assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); + LocatedBlock lb1 = lbs.get(0); assertEquals("Wrong replication", REPLICATION, lb1.getLocations().length); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); }