HDFS-3031. Fix complete() and getAdditionalBlock() RPCs to be idempotent. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1338466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-14 22:35:25 +00:00
parent 05c5fcfb42
commit bcdb125643
10 changed files with 306 additions and 49 deletions

View File

@ -661,6 +661,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3414. Balancer does not find NameNode if rpc-address or
servicerpc-address are not set in client configs. (atm)
HDFS-3031. Fix complete() and getAdditionalBlock() RPCs to be idempotent
(todd)
BREAKDOWN OF HDFS-1623 SUBTASKS
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

View File

@ -965,6 +965,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
DatanodeInfo[] nodes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
ExtendedBlock oldBlock = block;
do {
hasError = false;
lastException = null;
@ -972,9 +973,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
success = false;
long startTime = System.currentTimeMillis();
DatanodeInfo[] w = excludedNodes.toArray(
DatanodeInfo[] excluded = excludedNodes.toArray(
new DatanodeInfo[excludedNodes.size()]);
lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
block = oldBlock;
lb = locateFollowingBlock(startTime,
excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getBlockToken();

View File

@ -214,6 +214,17 @@ public class Block implements Writable, Comparable<Block> {
}
return compareTo((Block)o) == 0;
}
/**
* @return true if the two blocks have the same block ID and the same
* generation stamp, or if both blocks are null.
*/
public static boolean matchingIdAndGenStamp(Block a, Block b) {
if (a == b) return true; // same block, or both null
if (a == null || b == null) return false; // only one null
return a.blockId == b.blockId &&
a.generationStamp == b.generationStamp;
}
@Override // Object
public int hashCode() {

View File

@ -309,6 +309,7 @@ public interface ClientProtocol {
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
@Idempotent
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
throws AccessControlException, FileNotFoundException,
@ -362,6 +363,7 @@ public interface ClientProtocol {
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
@Idempotent
public boolean complete(String src, String clientName, ExtendedBlock last)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException;

View File

@ -1883,6 +1883,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
QuotaExceededException, SafeModeException, UnresolvedLinkException,
IOException {
checkBlock(previous);
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
long fileLength, blockSize;
int replication;
DatanodeDescriptor clientNode = null;
@ -1905,10 +1906,65 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// have we exceeded the configured limit of fs objects.
checkFsObjectLimit();
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
// three possibilities:
// 1) This is the first block allocation of an append() pipeline
// which started appending exactly at a block boundary.
// In this case, the client isn't passed the previous block,
// so it makes the allocateBlock() call with previous=null.
// We can distinguish this since the last block of the file
// will be exactly a full block.
// 2) This is a retry from a client that missed the response of a
// prior getAdditionalBlock() call, perhaps because of a network
// timeout, or because of an HA failover. In that case, we know
// by the fact that the client is re-issuing the RPC that it
// never began to write to the old block. Hence it is safe to
// abandon it and allocate a new one.
// 3) This is an entirely bogus request/bug -- we should error out
// rather than potentially appending a new block with an empty
// one in the middle, etc
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
if (previous == null &&
lastBlockInFile != null &&
lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
lastBlockInFile.isComplete()) {
// Case 1
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
" writing to a file with a complete previous block: src=" +
src + " lastBlock=" + lastBlockInFile);
}
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
// Case 2
if (lastBlockInFile.getNumBytes() != 0) {
throw new IOException(
"Request looked like a retry to allocate block " +
lastBlockInFile + " but it already contains " +
lastBlockInFile.getNumBytes() + " bytes");
}
// The retry case ("b" above) -- abandon the old block.
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " +
"caught retry for allocation of a new block in " +
src + ". Abandoning old block " + lastBlockInFile);
dir.removeBlock(src, pendingFile, lastBlockInFile);
dir.persistBlocks(src, pendingFile);
} else {
throw new IOException("Cannot allocate block in " + src + ": " +
"passed 'previous' block " + previous + " does not match actual " +
"last block in file " + lastBlockInFile);
}
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, ExtendedBlock.getLocalBlock(previous));
commitOrCompleteLastBlock(pendingFile, previousBlock);
//
// If we fail this, bad things happen!
@ -2119,7 +2175,29 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot complete file " + src, safeMode);
}
INodeFileUnderConstruction pendingFile = checkLease(src, holder);
INodeFileUnderConstruction pendingFile;
try {
pendingFile = checkLease(src, holder);
} catch (LeaseExpiredException lee) {
INodeFile file = dir.getFileINode(src);
if (file != null && !file.isUnderConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
Block realLastBlock = file.getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " +
"received request from " + holder + " to complete file " + src +
" which is already closed. But, it appears to be an RPC " +
"retry. Returning success.");
return true;
}
}
throw lee;
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, last);

View File

@ -1658,19 +1658,14 @@ public class MiniDFSCluster {
return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
}
private HAServiceProtocol getHaServiceClient(int nnIndex) throws IOException {
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
return new HAServiceProtocolClientSideTranslatorPB(addr, conf);
}
public void transitionToActive(int nnIndex) throws IOException,
ServiceFailedException {
HAServiceProtocolHelper.transitionToActive(getHaServiceClient(nnIndex));
getNameNode(nnIndex).getRpcServer().transitionToActive();
}
public void transitionToStandby(int nnIndex) throws IOException,
ServiceFailedException {
HAServiceProtocolHelper.transitionToStandby(getHaServiceClient(nnIndex));
getNameNode(nnIndex).getRpcServer().transitionToStandby();
}

View File

@ -65,10 +65,13 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.ThrowsException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
/**
* These tests make sure that DFSClient retries fetching data from DFS
* properly in case of errors.
@ -298,6 +301,100 @@ public class TestDFSClientRetries extends TestCase {
cluster.shutdown();
}
}
/**
* Test that getAdditionalBlock() and close() are idempotent. This allows
* a client to safely retry a call and still produce a correct
* file. See HDFS-3031.
*/
public void testIdempotentAllocateBlockAndClose() throws Exception {
final String src = "/testIdempotentAllocateBlock";
Path file = new Path(src);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN);
DFSClient client = new DFSClient(null, spyNN, conf, null);
// Make the call to addBlock() get called twice, as if it were retried
// due to an IPC issue.
doAnswer(new Answer<LocatedBlock>() {
@Override
public LocatedBlock answer(InvocationOnMock invocation) throws Throwable {
LocatedBlock ret = (LocatedBlock) invocation.callRealMethod();
LocatedBlocks lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE);
int blockCount = lb.getLocatedBlocks().size();
assertEquals(lb.getLastLocatedBlock().getBlock(), ret.getBlock());
// Retrying should result in a new block at the end of the file.
// (abandoning the old one)
LocatedBlock ret2 = (LocatedBlock) invocation.callRealMethod();
lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE);
int blockCount2 = lb.getLocatedBlocks().size();
assertEquals(lb.getLastLocatedBlock().getBlock(), ret2.getBlock());
// We shouldn't have gained an extra block by the RPC.
assertEquals(blockCount, blockCount2);
return (LocatedBlock) ret2;
}
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
// complete() may return false a few times before it returns
// true. We want to wait until it returns true, and then
// make it retry one more time after that.
LOG.info("Called complete(: " +
Joiner.on(",").join(invocation.getArguments()) + ")");
if (!(Boolean)invocation.callRealMethod()) {
LOG.info("Complete call returned false, not faking a retry RPC");
return false;
}
// We got a successful close. Call it again to check idempotence.
try {
boolean ret = (Boolean) invocation.callRealMethod();
LOG.info("Complete call returned true, faked second RPC. " +
"Returned: " + ret);
return ret;
} catch (Throwable t) {
LOG.error("Idempotent retry threw exception", t);
throw t;
}
}
}).when(spyNN).complete(Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any());
OutputStream stm = client.create(file.toString(), true);
try {
AppendTestUtil.write(stm, 0, 10000);
stm.close();
stm = null;
} finally {
IOUtils.cleanup(LOG, stm);
}
// Make sure the mock was actually properly injected.
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any());
AppendTestUtil.check(fs, file, 10000);
} finally {
cluster.shutdown();
}
}
/**
* Mock Answer implementation of NN.getBlockLocations that will return

View File

@ -21,10 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@ -43,9 +39,13 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3 extends junit.framework.TestCase {
public class TestFileAppend3 {
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
@ -64,29 +64,28 @@ public class TestFileAppend3 extends junit.framework.TestCase {
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
public static Test suite() {
return new TestSetup(new TestSuite(TestFileAppend3.class)) {
protected void setUp() throws java.lang.Exception {
AppendTestUtil.LOG.info("setUp()");
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
fs = (DistributedFileSystem)cluster.getFileSystem();
}
protected void tearDown() throws Exception {
AppendTestUtil.LOG.info("tearDown()");
if(fs != null) fs.close();
if(cluster != null) cluster.shutdown();
}
};
@BeforeClass
public static void setUp() throws java.lang.Exception {
AppendTestUtil.LOG.info("setUp()");
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
fs = (DistributedFileSystem)cluster.getFileSystem();
}
@AfterClass
public static void tearDown() throws Exception {
AppendTestUtil.LOG.info("tearDown()");
if(fs != null) fs.close();
if(cluster != null) cluster.shutdown();
}
/**
* TC1: Append on block boundary.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC1() throws Exception {
final Path p = new Path("/TC1/foo");
System.out.println("p=" + p);
@ -115,6 +114,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
* TC2: Append on non-block boundary.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC2() throws Exception {
final Path p = new Path("/TC2/foo");
System.out.println("p=" + p);
@ -145,6 +145,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
* TC5: Only one simultaneous append.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC5() throws Exception {
final Path p = new Path("/TC5/foo");
System.out.println("p=" + p);
@ -175,6 +176,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
* TC7: Corrupted replicas are present.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC7() throws Exception {
final short repl = 2;
final Path p = new Path("/TC7/foo");
@ -224,6 +226,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
* TC11: Racing rename
* @throws IOException an exception might be thrown
*/
@Test
public void testTC11() throws Exception {
final Path p = new Path("/TC11/foo");
System.out.println("p=" + p);
@ -282,6 +285,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
* TC12: Append to partial CRC chunk
* @throws IOException an exception might be thrown
*/
@Test
public void testTC12() throws Exception {
final Path p = new Path("/TC12/foo");
System.out.println("p=" + p);
@ -313,6 +317,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
* *
* @throws IOException
*/
@Test
public void testAppendToPartialChunk() throws IOException {
final Path p = new Path("/partialChunk/foo");
final int fileLen = 513;

View File

@ -85,12 +85,52 @@ public class TestPipelinesFailover {
private static final int STRESS_NUM_THREADS = 25;
private static final int STRESS_RUNTIME = 40000;
enum TestScenario {
GRACEFUL_FAILOVER {
void run(MiniDFSCluster cluster) throws IOException {
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
}
},
ORIGINAL_ACTIVE_CRASHED {
void run(MiniDFSCluster cluster) throws IOException {
cluster.restartNameNode(0);
cluster.transitionToActive(1);
}
};
abstract void run(MiniDFSCluster cluster) throws IOException;
}
enum MethodToTestIdempotence {
ALLOCATE_BLOCK,
COMPLETE_FILE;
}
/**
* Tests continuing a write pipeline over a failover.
*/
@Test(timeout=30000)
public void testWriteOverFailover() throws Exception {
public void testWriteOverGracefulFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.GRACEFUL_FAILOVER,
MethodToTestIdempotence.ALLOCATE_BLOCK);
}
@Test(timeout=30000)
public void testAllocateBlockAfterCrashFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
MethodToTestIdempotence.ALLOCATE_BLOCK);
}
@Test(timeout=30000)
public void testCompleteFileAfterCrashFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
MethodToTestIdempotence.COMPLETE_FILE);
}
private void doWriteOverFailoverTest(TestScenario scenario,
MethodToTestIdempotence methodToTest) throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
// Don't check replication periodically.
@ -102,6 +142,8 @@ public class TestPipelinesFailover {
.numDataNodes(3)
.build();
try {
int sizeWritten = 0;
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
@ -112,28 +154,39 @@ public class TestPipelinesFailover {
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
sizeWritten += BLOCK_AND_A_HALF;
// Make sure all of the blocks are written out before failover.
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
scenario.run(cluster);
assertTrue(fs.exists(TEST_PATH));
// NOTE: explicitly do *not* make any further metadata calls
// to the NN here. The next IPC call should be to allocate the next
// block. Any other call would notice the failover and not test
// idempotence of the operation (HDFS-3031)
FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
BlockManagerTestUtil.updateState(ns1.getBlockManager());
assertEquals(0, ns1.getPendingReplicationBlocks());
assertEquals(0, ns1.getCorruptReplicaBlocks());
assertEquals(0, ns1.getMissingBlocksCount());
// write another block and a half
AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
// If we're testing allocateBlock()'s idempotence, write another
// block and a half, so we have to allocate a new block.
// Otherise, don't write anything, so our next RPC will be
// completeFile() if we're testing idempotence of that operation.
if (methodToTest == MethodToTestIdempotence.ALLOCATE_BLOCK) {
// write another block and a half
AppendTestUtil.write(stm, sizeWritten, BLOCK_AND_A_HALF);
sizeWritten += BLOCK_AND_A_HALF;
}
stm.close();
stm = null;
AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE * 3);
AppendTestUtil.check(fs, TEST_PATH, sizeWritten);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
@ -146,7 +199,18 @@ public class TestPipelinesFailover {
* even when the pipeline was constructed on a different NN.
*/
@Test(timeout=30000)
public void testWriteOverFailoverWithDnFail() throws Exception {
public void testWriteOverGracefulFailoverWithDnFail() throws Exception {
doTestWriteOverFailoverWithDnFail(TestScenario.GRACEFUL_FAILOVER);
}
@Test(timeout=30000)
public void testWriteOverCrashFailoverWithDnFail() throws Exception {
doTestWriteOverFailoverWithDnFail(TestScenario.ORIGINAL_ACTIVE_CRASHED);
}
private void doTestWriteOverFailoverWithDnFail(TestScenario scenario)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@ -171,8 +235,7 @@ public class TestPipelinesFailover {
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
scenario.run(cluster);
assertTrue(fs.exists(TEST_PATH));
@ -183,8 +246,8 @@ public class TestPipelinesFailover {
stm.hflush();
LOG.info("Failing back to NN 0");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
cluster.stopDataNode(1);

View File

@ -67,8 +67,8 @@ public class TestStateTransitionFailure {
fail("Transitioned to active but should not have been able to.");
} catch (ServiceFailedException sfe) {
assertExceptionContains("Error encountered requiring NN shutdown. " +
"Shutting down immediately.", sfe);
LOG.info("got expected exception", sfe);
"Shutting down immediately.", sfe.getCause());
LOG.info("got expected exception", sfe.getCause());
}
verify(mockRuntime, times(1)).exit(anyInt());
} finally {