HDFS-8081. Split getAdditionalBlock() into two methods. Contributed by Konstantin Shvachko
This commit is contained in:
parent
af9d4fede5
commit
0959b67f1a
|
@ -474,6 +474,8 @@ Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
HDFS-8081. Split getAdditionalBlock() into two methods. (shv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -3009,6 +3009,31 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
||||||
ExtendedBlock previous, Set<Node> excludedNodes,
|
ExtendedBlock previous, Set<Node> excludedNodes,
|
||||||
List<String> favoredNodes) throws IOException {
|
List<String> favoredNodes) throws IOException {
|
||||||
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
|
DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
|
||||||
|
clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
|
||||||
|
if (targets == null) {
|
||||||
|
assert onRetryBlock[0] != null : "Retry block is null";
|
||||||
|
// This is a retry. Just return the last block.
|
||||||
|
return onRetryBlock[0];
|
||||||
|
}
|
||||||
|
LocatedBlock newBlock = storeAllocatedBlock(
|
||||||
|
src, fileId, clientName, previous, targets);
|
||||||
|
return newBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Part I of getAdditionalBlock().
|
||||||
|
* Analyze the state of the file under read lock to determine if the client
|
||||||
|
* can add a new block, detect potential retries, lease mismatches,
|
||||||
|
* and minimal replication of the penultimate block.
|
||||||
|
*
|
||||||
|
* Generate target DataNode locations for the new block,
|
||||||
|
* but do not create the new block yet.
|
||||||
|
*/
|
||||||
|
DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
|
||||||
|
String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
|
||||||
|
List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
|
||||||
final long blockSize;
|
final long blockSize;
|
||||||
final int replication;
|
final int replication;
|
||||||
final byte storagePolicyID;
|
final byte storagePolicyID;
|
||||||
|
@ -3020,7 +3045,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
+ src + " inodeId " + fileId + " for " + clientName);
|
+ src + " inodeId " + fileId + " for " + clientName);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Part I. Analyze the state of the file with respect to the input data.
|
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||||
FSPermissionChecker pc = getPermissionChecker();
|
FSPermissionChecker pc = getPermissionChecker();
|
||||||
|
@ -3028,7 +3052,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
src = dir.resolvePath(pc, src, pathComponents);
|
||||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
||||||
FileState fileState = analyzeFileState(
|
FileState fileState = analyzeFileState(
|
||||||
src, fileId, clientName, previous, onRetryBlock);
|
src, fileId, clientName, previous, onRetryBlock);
|
||||||
final INodeFile pendingFile = fileState.inode;
|
final INodeFile pendingFile = fileState.inode;
|
||||||
|
@ -3039,8 +3062,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
src = fileState.path;
|
src = fileState.path;
|
||||||
|
|
||||||
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
|
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
|
||||||
// This is a retry. Just return the last block if having locations.
|
// This is a retry. No need to generate new locations.
|
||||||
return onRetryBlock[0];
|
// Use the last block if it has locations.
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
|
if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
|
||||||
throw new IOException("File has reached the limit on maximum number of"
|
throw new IOException("File has reached the limit on maximum number of"
|
||||||
|
@ -3064,12 +3088,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
|
|
||||||
// choose targets for the new block to be allocated.
|
// choose targets for the new block to be allocated.
|
||||||
final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock(
|
return getBlockManager().chooseTarget4NewBlock(
|
||||||
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
|
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
|
||||||
storagePolicyID);
|
storagePolicyID);
|
||||||
|
}
|
||||||
|
|
||||||
// Part II.
|
/**
|
||||||
// Allocate a new block, add it to the INode and the BlocksMap.
|
* Part II of getAdditionalBlock().
|
||||||
|
* Should repeat the same analysis of the file state as in Part 1,
|
||||||
|
* but under the write lock.
|
||||||
|
* If the conditions still hold, then allocate a new block with
|
||||||
|
* the new targets, add it to the INode and to the BlocksMap.
|
||||||
|
*/
|
||||||
|
LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName,
|
||||||
|
ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
|
||||||
Block newBlock = null;
|
Block newBlock = null;
|
||||||
long offset;
|
long offset;
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
|
|
|
@ -20,16 +20,10 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -38,18 +32,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.net.Node;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Race between two threads simultaneously calling
|
* Race between two threads simultaneously calling
|
||||||
|
@ -63,10 +51,6 @@ public class TestAddBlockRetry {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
|
|
||||||
private int count = 0;
|
|
||||||
private LocatedBlock lb1;
|
|
||||||
private LocatedBlock lb2;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
@ -92,43 +76,8 @@ public class TestAddBlockRetry {
|
||||||
final String src = "/testRetryAddBlockWhileInChooseTarget";
|
final String src = "/testRetryAddBlockWhileInChooseTarget";
|
||||||
|
|
||||||
final FSNamesystem ns = cluster.getNamesystem();
|
final FSNamesystem ns = cluster.getNamesystem();
|
||||||
BlockManager spyBM = spy(ns.getBlockManager());
|
|
||||||
final NamenodeProtocols nn = cluster.getNameNodeRpc();
|
final NamenodeProtocols nn = cluster.getNameNodeRpc();
|
||||||
|
|
||||||
// substitute mocked BlockManager into FSNamesystem
|
|
||||||
Class<? extends FSNamesystem> nsClass = ns.getClass();
|
|
||||||
Field bmField = nsClass.getDeclaredField("blockManager");
|
|
||||||
bmField.setAccessible(true);
|
|
||||||
bmField.set(ns, spyBM);
|
|
||||||
|
|
||||||
doAnswer(new Answer<DatanodeStorageInfo[]>() {
|
|
||||||
@Override
|
|
||||||
public DatanodeStorageInfo[] answer(InvocationOnMock invocation)
|
|
||||||
throws Throwable {
|
|
||||||
LOG.info("chooseTarget for " + src);
|
|
||||||
DatanodeStorageInfo[] ret =
|
|
||||||
(DatanodeStorageInfo[]) invocation.callRealMethod();
|
|
||||||
assertTrue("Penultimate block must be complete",
|
|
||||||
checkFileProgress(src, false));
|
|
||||||
count++;
|
|
||||||
if(count == 1) { // run second addBlock()
|
|
||||||
LOG.info("Starting second addBlock for " + src);
|
|
||||||
nn.addBlock(src, "clientName", null, null,
|
|
||||||
INodeId.GRANDFATHER_INODE_ID, null);
|
|
||||||
assertTrue("Penultimate block must be complete",
|
|
||||||
checkFileProgress(src, false));
|
|
||||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
|
||||||
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
|
||||||
lb2 = lbs.get(0);
|
|
||||||
assertEquals("Wrong replication",
|
|
||||||
REPLICATION, lb2.getLocations().length);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}).when(spyBM).chooseTarget4NewBlock(Mockito.anyString(), Mockito.anyInt(),
|
|
||||||
Mockito.<DatanodeDescriptor>any(), Mockito.<HashSet<Node>>any(),
|
|
||||||
Mockito.anyLong(), Mockito.<List<String>>any(), Mockito.anyByte());
|
|
||||||
|
|
||||||
// create file
|
// create file
|
||||||
nn.create(src, FsPermission.getFileDefault(),
|
nn.create(src, FsPermission.getFileDefault(),
|
||||||
"clientName",
|
"clientName",
|
||||||
|
@ -137,12 +86,32 @@ public class TestAddBlockRetry {
|
||||||
|
|
||||||
// start first addBlock()
|
// start first addBlock()
|
||||||
LOG.info("Starting first addBlock for " + src);
|
LOG.info("Starting first addBlock for " + src);
|
||||||
nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null);
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
|
DatanodeStorageInfo targets[] = ns.getNewBlockTargets(
|
||||||
|
src, INodeId.GRANDFATHER_INODE_ID, "clientName",
|
||||||
|
null, null, null, onRetryBlock);
|
||||||
|
assertNotNull("Targets must be generated", targets);
|
||||||
|
|
||||||
// check locations
|
// run second addBlock()
|
||||||
|
LOG.info("Starting second addBlock for " + src);
|
||||||
|
nn.addBlock(src, "clientName", null, null,
|
||||||
|
INodeId.GRANDFATHER_INODE_ID, null);
|
||||||
|
assertTrue("Penultimate block must be complete",
|
||||||
|
checkFileProgress(src, false));
|
||||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||||
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
||||||
lb1 = lbs.get(0);
|
LocatedBlock lb2 = lbs.get(0);
|
||||||
|
assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length);
|
||||||
|
|
||||||
|
// continue first addBlock()
|
||||||
|
LocatedBlock newBlock = ns.storeAllocatedBlock(
|
||||||
|
src, INodeId.GRANDFATHER_INODE_ID, "clientName", null, targets);
|
||||||
|
assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock());
|
||||||
|
|
||||||
|
// check locations
|
||||||
|
lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||||
|
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
||||||
|
LocatedBlock lb1 = lbs.get(0);
|
||||||
assertEquals("Wrong replication", REPLICATION, lb1.getLocations().length);
|
assertEquals("Wrong replication", REPLICATION, lb1.getLocations().length);
|
||||||
assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
|
assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue