HDFS-4452. getAdditionalBlock() can create multiple blocks if the client times out and retries. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1441681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Shvachko 2013-02-02 02:07:26 +00:00
parent a04bee4567
commit 61a262757c
3 changed files with 305 additions and 108 deletions

View File

@ -744,6 +744,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4428. FsDatasetImpl should disclose what the error is when a rename
fails. (Colin Patrick McCabe via atm)
HDFS-4452. getAdditionalBlock() can create multiple blocks if the client
times out and retries. (shv)
BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs.

View File

@ -2200,12 +2200,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws LeaseExpiredException, NotReplicatedYetException,
QuotaExceededException, SafeModeException, UnresolvedLinkException,
IOException {
checkBlock(previous);
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
long fileLength, blockSize;
long blockSize;
int replication;
DatanodeDescriptor clientNode = null;
Block newBlock = null;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
@ -2213,10 +2210,82 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+src+" for "+clientName);
}
// Part I. Analyze the state of the file with respect to the input data.
readLock();
try {
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
final INode[] inodes = analyzeFileState(
src, clientName, previous, onRetryBlock).getINodes();
final INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) inodes[inodes.length - 1];
if(onRetryBlock[0] != null) {
// This is a retry. Just return the last block.
return onRetryBlock[0];
}
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = pendingFile.getBlockReplication();
} finally {
readUnlock();
}
// choose targets for the new block to be allocated.
final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
src, replication, clientNode, excludedNodes, blockSize);
// Part II.
// Allocate a new block, add it to the INode and the BlocksMap.
Block newBlock = null;
long offset;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
// Run the full analysis again, since things could have changed
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
INodesInPath inodesInPath =
analyzeFileState(src, clientName, previous, onRetryBlock);
INode[] inodes = inodesInPath.getINodes();
final INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) inodes[inodes.length - 1];
if(onRetryBlock[0] != null) {
// This is a retry. Just return the last block.
return onRetryBlock[0];
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile,
ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
newBlock = createNewBlock();
saveAllocatedBlock(src, inodesInPath, newBlock, targets);
dir.persistBlocks(src, pendingFile);
offset = pendingFile.computeFileSize(true);
} finally {
writeUnlock();
}
if (persistBlocks) {
getEditLog().logSync();
}
// Return located block
return makeLocatedBlock(newBlock, targets, offset);
}
INodesInPath analyzeFileState(String src,
String clientName,
ExtendedBlock previous,
LocatedBlock[] onRetryBlock)
throws IOException {
assert hasReadOrWriteLock();
checkBlock(previous);
onRetryBlock[0] = null;
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot add block to " + src, safeMode);
}
@ -2224,12 +2293,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// have we exceeded the configured limit of fs objects.
checkFsObjectLimit();
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
final INodesInPath inodesInPath =
dir.rootDir.getExistingPathINodes(src, true);
final INode[] inodes = inodesInPath.getINodes();
final INodeFileUnderConstruction pendingFile
= checkLease(src, clientName, inodes[inodes.length - 1]);
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
// three possibilities:
// four possibilities:
// 1) This is the first block allocation of an append() pipeline
// which started appending exactly at a block boundary.
// In this case, the client isn't passed the previous block,
@ -2241,10 +2315,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// timeout, or because of an HA failover. In that case, we know
// by the fact that the client is re-issuing the RPC that it
// never began to write to the old block. Hence it is safe to
// abandon it and allocate a new one.
// to return the existing block.
// 3) This is an entirely bogus request/bug -- we should error out
// rather than potentially appending a new block with an empty
// one in the middle, etc
// 4) This is a retry from a client that timed out while
// the prior getAdditionalBlock() is still being processed,
// currently working on chooseTarget().
// There are no means to distinguish between the first and
// the second attempts in Part I, because the first one hasn't
// changed the namesystem state yet.
// We run this analysis again in Part II where case 4 is impossible.
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
if (previous == null &&
@ -2259,7 +2340,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
src + " lastBlock=" + lastBlockInFile);
}
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
// Case 2
if (lastBlockInFile.getNumBytes() != 0) {
throw new IOException(
"Request looked like a retry to allocate block " +
@ -2267,76 +2347,39 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
lastBlockInFile.getNumBytes() + " bytes");
}
// The retry case ("b" above) -- abandon the old block.
// Case 2
// Return the last block.
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
"caught retry for allocation of a new block in " +
src + ". Abandoning old block " + lastBlockInFile);
dir.removeBlock(src, pendingFile, lastBlockInFile);
dir.persistBlocks(src, pendingFile);
src + ". Returning previously allocated block " + lastBlockInFile);
long offset = pendingFile.computeFileSize(true);
onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
offset);
return inodesInPath;
} else {
// Case 3
throw new IOException("Cannot allocate block in " + src + ": " +
"passed 'previous' block " + previous + " does not match actual " +
"last block in file " + lastBlockInFile);
}
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, previousBlock);
//
// If we fail this, bad things happen!
//
// Check if the penultimate block is minimally replicated
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = pendingFile.getBlockReplication();
} finally {
writeUnlock();
return inodesInPath;
}
// choose targets for the new block to be allocated.
final DatanodeDescriptor targets[] = blockManager.chooseTarget(
src, replication, clientNode, excludedNodes, blockSize);
// Allocate a new block and record it in the INode.
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot add block to " + src, safeMode);
}
final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
final INode[] inodes = inodesInPath.getINodes();
final INodeFileUnderConstruction pendingFile
= checkLease(src, clientName, inodes[inodes.length - 1]);
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
// allocate new block record block locations in INode.
newBlock = allocateBlock(src, inodesInPath, targets);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
dir.persistBlocks(src, pendingFile);
} finally {
writeUnlock();
}
if (persistBlocks) {
getEditLog().logSync();
}
// Create next block
LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
return b;
LocatedBlock makeLocatedBlock(Block blk,
DatanodeInfo[] locs,
long offset) throws IOException {
LocatedBlock lBlk = new LocatedBlock(
getExtendedBlock(blk), locs, offset);
getBlockManager().setBlockToken(
lBlk, BlockTokenSecretManager.AccessMode.WRITE);
return lBlk;
}
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
@ -2528,22 +2571,33 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
/**
* Allocate a block at the given pending filename
* Save allocated block at the given pending filename
*
* @param src path to the file
* @param inodesInPath representing each of the components of src.
* The last INode is the INode for the file.
* @throws QuotaExceededException If addition of block exceeds space quota
*/
private Block allocateBlock(String src, INodesInPath inodesInPath,
DatanodeDescriptor targets[]) throws IOException {
BlockInfo saveAllocatedBlock(String src, INodesInPath inodesInPath,
Block newBlock, DatanodeDescriptor targets[]) throws IOException {
assert hasWriteLock();
BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets);
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
+ getBlockPoolId() + " " + b);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
return b;
}
/**
* Create new block with a unique block id and a new generation stamp.
*/
Block createNewBlock() throws IOException {
assert hasWriteLock();
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0);
// Increment the generation stamp for every new block.
b.setGenerationStamp(nextGenerationStamp());
b = dir.addBlock(src, inodesInPath, b, targets);
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
+ blockPoolId + " " + b);
return b;
}

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.lang.reflect.Field;
import java.util.EnumSet;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.net.Node;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Race between two threads simultaneously calling
* FSNamesystem.getAdditionalBlock().
*/
public class TestAddBlockRetry {
public static final Log LOG = LogFactory.getLog(TestAddBlockRetry.class);
private static final short REPLICATION = 3;
private Configuration conf;
private MiniDFSCluster cluster;
private int count = 0;
private LocatedBlock lb1;
private LocatedBlock lb2;
@Before
public void setUp() throws Exception {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPLICATION)
.build();
cluster.waitActive();
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Retry addBlock() while another thread is in chooseTarget().
* See HDFS-4452.
*/
@Test
public void testRetryAddBlockWhileInChooseTarget() throws Exception {
final String src = "/testRetryAddBlockWhileInChooseTarget";
FSNamesystem ns = cluster.getNamesystem();
BlockManager spyBM = spy(ns.getBlockManager());
final NamenodeProtocols nn = cluster.getNameNodeRpc();
// substitute mocked BlockManager into FSNamesystem
Class<? extends FSNamesystem> nsClass = ns.getClass();
Field bmField = nsClass.getDeclaredField("blockManager");
bmField.setAccessible(true);
bmField.set(ns, spyBM);
doAnswer(new Answer<DatanodeDescriptor[]>() {
@Override
public DatanodeDescriptor[] answer(InvocationOnMock invocation)
throws Throwable {
LOG.info("chooseTarget for " + src);
DatanodeDescriptor[] ret =
(DatanodeDescriptor[]) invocation.callRealMethod();
count++;
if(count == 1) { // run second addBlock()
LOG.info("Starting second addBlock for " + src);
nn.addBlock(src, "clientName", null, null);
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
lb2 = lbs.get(0);
assertEquals("Wrong replication",
REPLICATION, lb2.getLocations().length);
}
return ret;
}
}).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(),
Mockito.anyLong());
// create file
nn.create(src, FsPermission.getFileDefault(),
"clientName",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
true, (short)3, 1024);
// start first addBlock()
LOG.info("Starting first addBlock for " + src);
nn.addBlock(src, "clientName", null, null);
// check locations
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
lb1 = lbs.get(0);
assertEquals("Wrong replication", REPLICATION, lb1.getLocations().length);
assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
}
}