diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 766d6ea9b17..5d6a8611d47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -545,6 +545,9 @@ Release 2.0.0 - UNRELEASED HDFS-3414. Balancer does not find NameNode if rpc-address or servicerpc-address are not set in client configs. (atm) + HDFS-3031. Fix complete() and getAdditionalBlock() RPCs to be idempotent + (todd) + BREAKDOWN OF HDFS-1623 SUBTASKS HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index c607ab05eab..039b40c7c7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -965,6 +965,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { DatanodeInfo[] nodes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; + ExtendedBlock oldBlock = block; do { hasError = false; lastException = null; @@ -972,9 +973,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { success = false; long startTime = System.currentTimeMillis(); - DatanodeInfo[] w = excludedNodes.toArray( + DatanodeInfo[] excluded = excludedNodes.toArray( new DatanodeInfo[excludedNodes.size()]); - lb = locateFollowingBlock(startTime, w.length > 0 ? w : null); + block = oldBlock; + lb = locateFollowingBlock(startTime, + excluded.length > 0 ? excluded : null); block = lb.getBlock(); block.setNumBytes(0); accessToken = lb.getBlockToken(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java index 33c86f96d89..20324be4831 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java @@ -214,6 +214,17 @@ public class Block implements Writable, Comparable { } return compareTo((Block)o) == 0; } + + /** + * @return true if the two blocks have the same block ID and the same + * generation stamp, or if both blocks are null. + */ + public static boolean matchingIdAndGenStamp(Block a, Block b) { + if (a == b) return true; // same block, or both null + if (a == null || b == null) return false; // only one null + return a.blockId == b.blockId && + a.generationStamp == b.generationStamp; + } @Override // Object public int hashCode() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 26f309a28fe..9a024656029 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -309,6 +309,7 @@ public interface ClientProtocol { * @throws UnresolvedLinkException If src contains a symlink * @throws IOException If an I/O error occurred */ + @Idempotent public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes) throws AccessControlException, FileNotFoundException, @@ -362,6 +363,7 @@ public interface ClientProtocol { * @throws UnresolvedLinkException If src contains a symlink * @throws IOException If an I/O error occurred */ + @Idempotent public boolean complete(String src, String clientName, ExtendedBlock last) throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f1072f9bfe3..9821049fc88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1868,6 +1868,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, QuotaExceededException, SafeModeException, UnresolvedLinkException, IOException { checkBlock(previous); + Block previousBlock = ExtendedBlock.getLocalBlock(previous); long fileLength, blockSize; int replication; DatanodeDescriptor clientNode = null; @@ -1890,10 +1891,65 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // have we exceeded the configured limit of fs objects. checkFsObjectLimit(); - INodeFileUnderConstruction pendingFile = checkLease(src, clientName); + INodeFileUnderConstruction pendingFile = checkLease(src, clientName); + BlockInfo lastBlockInFile = pendingFile.getLastBlock(); + if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { + // The block that the client claims is the current last block + // doesn't match up with what we think is the last block. There are + // three possibilities: + // 1) This is the first block allocation of an append() pipeline + // which started appending exactly at a block boundary. + // In this case, the client isn't passed the previous block, + // so it makes the allocateBlock() call with previous=null. + // We can distinguish this since the last block of the file + // will be exactly a full block. + // 2) This is a retry from a client that missed the response of a + // prior getAdditionalBlock() call, perhaps because of a network + // timeout, or because of an HA failover. In that case, we know + // by the fact that the client is re-issuing the RPC that it + // never began to write to the old block. Hence it is safe to + // abandon it and allocate a new one. + // 3) This is an entirely bogus request/bug -- we should error out + // rather than potentially appending a new block with an empty + // one in the middle, etc + + BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); + if (previous == null && + lastBlockInFile != null && + lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() && + lastBlockInFile.isComplete()) { + // Case 1 + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.allocateBlock: handling block allocation" + + " writing to a file with a complete previous block: src=" + + src + " lastBlock=" + lastBlockInFile); + } + } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) { + // Case 2 + if (lastBlockInFile.getNumBytes() != 0) { + throw new IOException( + "Request looked like a retry to allocate block " + + lastBlockInFile + " but it already contains " + + lastBlockInFile.getNumBytes() + " bytes"); + } + + // The retry case ("b" above) -- abandon the old block. + NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " + + "caught retry for allocation of a new block in " + + src + ". Abandoning old block " + lastBlockInFile); + dir.removeBlock(src, pendingFile, lastBlockInFile); + dir.persistBlocks(src, pendingFile); + } else { + + throw new IOException("Cannot allocate block in " + src + ": " + + "passed 'previous' block " + previous + " does not match actual " + + "last block in file " + lastBlockInFile); + } + } // commit the last block and complete it if it has minimum replicas - commitOrCompleteLastBlock(pendingFile, ExtendedBlock.getLocalBlock(previous)); + commitOrCompleteLastBlock(pendingFile, previousBlock); // // If we fail this, bad things happen! @@ -2104,7 +2160,29 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new SafeModeException("Cannot complete file " + src, safeMode); } - INodeFileUnderConstruction pendingFile = checkLease(src, holder); + INodeFileUnderConstruction pendingFile; + try { + pendingFile = checkLease(src, holder); + } catch (LeaseExpiredException lee) { + INodeFile file = dir.getFileINode(src); + if (file != null && !file.isUnderConstruction()) { + // This could be a retry RPC - i.e the client tried to close + // the file, but missed the RPC response. Thus, it is trying + // again to close the file. If the file still exists and + // the client's view of the last block matches the actual + // last block, then we'll treat it as a successful close. + // See HDFS-3031. + Block realLastBlock = file.getLastBlock(); + if (Block.matchingIdAndGenStamp(last, realLastBlock)) { + NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " + + "received request from " + holder + " to complete file " + src + + " which is already closed. But, it appears to be an RPC " + + "retry. Returning success."); + return true; + } + } + throw lee; + } // commit the last block and complete it if it has minimum replicas commitOrCompleteLastBlock(pendingFile, last); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 11526a230c3..b418fcf5072 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1658,19 +1658,14 @@ public class MiniDFSCluster { return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf); } - private HAServiceProtocol getHaServiceClient(int nnIndex) throws IOException { - InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress(); - return new HAServiceProtocolClientSideTranslatorPB(addr, conf); - } - public void transitionToActive(int nnIndex) throws IOException, ServiceFailedException { - HAServiceProtocolHelper.transitionToActive(getHaServiceClient(nnIndex)); + getNameNode(nnIndex).getRpcServer().transitionToActive(); } public void transitionToStandby(int nnIndex) throws IOException, ServiceFailedException { - HAServiceProtocolHelper.transitionToStandby(getHaServiceClient(nnIndex)); + getNameNode(nnIndex).getRpcServer().transitionToStandby(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index b1e353598cd..e04a6496b5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -63,10 +63,13 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.mockito.Mockito; import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.base.Joiner; + /** * These tests make sure that DFSClient retries fetching data from DFS * properly in case of errors. @@ -296,6 +299,100 @@ public class TestDFSClientRetries extends TestCase { cluster.shutdown(); } } + + /** + * Test that getAdditionalBlock() and close() are idempotent. This allows + * a client to safely retry a call and still produce a correct + * file. See HDFS-3031. + */ + public void testIdempotentAllocateBlockAndClose() throws Exception { + final String src = "/testIdempotentAllocateBlock"; + Path file = new Path(src); + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); + DFSClient client = new DFSClient(null, spyNN, conf, null); + + + // Make the call to addBlock() get called twice, as if it were retried + // due to an IPC issue. + doAnswer(new Answer() { + @Override + public LocatedBlock answer(InvocationOnMock invocation) throws Throwable { + LocatedBlock ret = (LocatedBlock) invocation.callRealMethod(); + LocatedBlocks lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE); + int blockCount = lb.getLocatedBlocks().size(); + assertEquals(lb.getLastLocatedBlock().getBlock(), ret.getBlock()); + + // Retrying should result in a new block at the end of the file. + // (abandoning the old one) + LocatedBlock ret2 = (LocatedBlock) invocation.callRealMethod(); + lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE); + int blockCount2 = lb.getLocatedBlocks().size(); + assertEquals(lb.getLastLocatedBlock().getBlock(), ret2.getBlock()); + + // We shouldn't have gained an extra block by the RPC. + assertEquals(blockCount, blockCount2); + return (LocatedBlock) ret2; + } + }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(), + Mockito.any(), Mockito.any()); + + doAnswer(new Answer() { + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + // complete() may return false a few times before it returns + // true. We want to wait until it returns true, and then + // make it retry one more time after that. + LOG.info("Called complete(: " + + Joiner.on(",").join(invocation.getArguments()) + ")"); + if (!(Boolean)invocation.callRealMethod()) { + LOG.info("Complete call returned false, not faking a retry RPC"); + return false; + } + // We got a successful close. Call it again to check idempotence. + try { + boolean ret = (Boolean) invocation.callRealMethod(); + LOG.info("Complete call returned true, faked second RPC. " + + "Returned: " + ret); + return ret; + } catch (Throwable t) { + LOG.error("Idempotent retry threw exception", t); + throw t; + } + } + }).when(spyNN).complete(Mockito.anyString(), Mockito.anyString(), + Mockito.any()); + + OutputStream stm = client.create(file.toString(), true); + try { + AppendTestUtil.write(stm, 0, 10000); + stm.close(); + stm = null; + } finally { + IOUtils.cleanup(LOG, stm); + } + + // Make sure the mock was actually properly injected. + Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock( + Mockito.anyString(), Mockito.anyString(), + Mockito.any(), Mockito.any()); + Mockito.verify(spyNN, Mockito.atLeastOnce()).complete( + Mockito.anyString(), Mockito.anyString(), + Mockito.any()); + + AppendTestUtil.check(fs, file, 10000); + } finally { + cluster.shutdown(); + } + } /** * Mock Answer implementation of NN.getBlockLocations that will return diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java index 40c0a1a3ed4..038edd8d2bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java @@ -21,10 +21,6 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import junit.extensions.TestSetup; -import junit.framework.Test; -import junit.framework.TestSuite; - import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; @@ -43,9 +39,13 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; /** This class implements some of tests posted in HADOOP-2658. */ -public class TestFileAppend3 extends junit.framework.TestCase { +public class TestFileAppend3 { { ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); @@ -64,29 +64,28 @@ public class TestFileAppend3 extends junit.framework.TestCase { private static MiniDFSCluster cluster; private static DistributedFileSystem fs; - public static Test suite() { - return new TestSetup(new TestSuite(TestFileAppend3.class)) { - protected void setUp() throws java.lang.Exception { - AppendTestUtil.LOG.info("setUp()"); - conf = new HdfsConfiguration(); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512); - buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build(); - fs = (DistributedFileSystem)cluster.getFileSystem(); - } - - protected void tearDown() throws Exception { - AppendTestUtil.LOG.info("tearDown()"); - if(fs != null) fs.close(); - if(cluster != null) cluster.shutdown(); - } - }; + @BeforeClass + public static void setUp() throws java.lang.Exception { + AppendTestUtil.LOG.info("setUp()"); + conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512); + buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build(); + fs = (DistributedFileSystem)cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() throws Exception { + AppendTestUtil.LOG.info("tearDown()"); + if(fs != null) fs.close(); + if(cluster != null) cluster.shutdown(); } /** * TC1: Append on block boundary. * @throws IOException an exception might be thrown */ + @Test public void testTC1() throws Exception { final Path p = new Path("/TC1/foo"); System.out.println("p=" + p); @@ -115,6 +114,7 @@ public class TestFileAppend3 extends junit.framework.TestCase { * TC2: Append on non-block boundary. * @throws IOException an exception might be thrown */ + @Test public void testTC2() throws Exception { final Path p = new Path("/TC2/foo"); System.out.println("p=" + p); @@ -145,6 +145,7 @@ public class TestFileAppend3 extends junit.framework.TestCase { * TC5: Only one simultaneous append. * @throws IOException an exception might be thrown */ + @Test public void testTC5() throws Exception { final Path p = new Path("/TC5/foo"); System.out.println("p=" + p); @@ -175,6 +176,7 @@ public class TestFileAppend3 extends junit.framework.TestCase { * TC7: Corrupted replicas are present. * @throws IOException an exception might be thrown */ + @Test public void testTC7() throws Exception { final short repl = 2; final Path p = new Path("/TC7/foo"); @@ -224,6 +226,7 @@ public class TestFileAppend3 extends junit.framework.TestCase { * TC11: Racing rename * @throws IOException an exception might be thrown */ + @Test public void testTC11() throws Exception { final Path p = new Path("/TC11/foo"); System.out.println("p=" + p); @@ -282,6 +285,7 @@ public class TestFileAppend3 extends junit.framework.TestCase { * TC12: Append to partial CRC chunk * @throws IOException an exception might be thrown */ + @Test public void testTC12() throws Exception { final Path p = new Path("/TC12/foo"); System.out.println("p=" + p); @@ -313,6 +317,7 @@ public class TestFileAppend3 extends junit.framework.TestCase { * * * @throws IOException */ + @Test public void testAppendToPartialChunk() throws IOException { final Path p = new Path("/partialChunk/foo"); final int fileLen = 513; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index 815be593599..a1e8f29e2fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -85,12 +85,52 @@ public class TestPipelinesFailover { private static final int STRESS_NUM_THREADS = 25; private static final int STRESS_RUNTIME = 40000; + + enum TestScenario { + GRACEFUL_FAILOVER { + void run(MiniDFSCluster cluster) throws IOException { + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + } + }, + ORIGINAL_ACTIVE_CRASHED { + void run(MiniDFSCluster cluster) throws IOException { + cluster.restartNameNode(0); + cluster.transitionToActive(1); + } + }; + + abstract void run(MiniDFSCluster cluster) throws IOException; + } + + enum MethodToTestIdempotence { + ALLOCATE_BLOCK, + COMPLETE_FILE; + } /** * Tests continuing a write pipeline over a failover. */ @Test(timeout=30000) - public void testWriteOverFailover() throws Exception { + public void testWriteOverGracefulFailover() throws Exception { + doWriteOverFailoverTest(TestScenario.GRACEFUL_FAILOVER, + MethodToTestIdempotence.ALLOCATE_BLOCK); + } + + @Test(timeout=30000) + public void testAllocateBlockAfterCrashFailover() throws Exception { + doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED, + MethodToTestIdempotence.ALLOCATE_BLOCK); + } + + @Test(timeout=30000) + public void testCompleteFileAfterCrashFailover() throws Exception { + doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED, + MethodToTestIdempotence.COMPLETE_FILE); + } + + private void doWriteOverFailoverTest(TestScenario scenario, + MethodToTestIdempotence methodToTest) throws Exception { Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); // Don't check replication periodically. @@ -102,6 +142,8 @@ public class TestPipelinesFailover { .numDataNodes(3) .build(); try { + int sizeWritten = 0; + cluster.waitActive(); cluster.transitionToActive(0); Thread.sleep(500); @@ -112,28 +154,39 @@ public class TestPipelinesFailover { // write a block and a half AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF); + sizeWritten += BLOCK_AND_A_HALF; // Make sure all of the blocks are written out before failover. stm.hflush(); LOG.info("Failing over to NN 1"); - cluster.transitionToStandby(0); - cluster.transitionToActive(1); + scenario.run(cluster); - assertTrue(fs.exists(TEST_PATH)); + // NOTE: explicitly do *not* make any further metadata calls + // to the NN here. The next IPC call should be to allocate the next + // block. Any other call would notice the failover and not test + // idempotence of the operation (HDFS-3031) + FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem(); BlockManagerTestUtil.updateState(ns1.getBlockManager()); assertEquals(0, ns1.getPendingReplicationBlocks()); assertEquals(0, ns1.getCorruptReplicaBlocks()); assertEquals(0, ns1.getMissingBlocksCount()); - // write another block and a half - AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF); - + // If we're testing allocateBlock()'s idempotence, write another + // block and a half, so we have to allocate a new block. + // Otherise, don't write anything, so our next RPC will be + // completeFile() if we're testing idempotence of that operation. + if (methodToTest == MethodToTestIdempotence.ALLOCATE_BLOCK) { + // write another block and a half + AppendTestUtil.write(stm, sizeWritten, BLOCK_AND_A_HALF); + sizeWritten += BLOCK_AND_A_HALF; + } + stm.close(); stm = null; - AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE * 3); + AppendTestUtil.check(fs, TEST_PATH, sizeWritten); } finally { IOUtils.closeStream(stm); cluster.shutdown(); @@ -146,7 +199,18 @@ public class TestPipelinesFailover { * even when the pipeline was constructed on a different NN. */ @Test(timeout=30000) - public void testWriteOverFailoverWithDnFail() throws Exception { + public void testWriteOverGracefulFailoverWithDnFail() throws Exception { + doTestWriteOverFailoverWithDnFail(TestScenario.GRACEFUL_FAILOVER); + } + + @Test(timeout=30000) + public void testWriteOverCrashFailoverWithDnFail() throws Exception { + doTestWriteOverFailoverWithDnFail(TestScenario.ORIGINAL_ACTIVE_CRASHED); + } + + + private void doTestWriteOverFailoverWithDnFail(TestScenario scenario) + throws Exception { Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -171,8 +235,7 @@ public class TestPipelinesFailover { stm.hflush(); LOG.info("Failing over to NN 1"); - cluster.transitionToStandby(0); - cluster.transitionToActive(1); + scenario.run(cluster); assertTrue(fs.exists(TEST_PATH)); @@ -183,8 +246,8 @@ public class TestPipelinesFailover { stm.hflush(); LOG.info("Failing back to NN 0"); - cluster.transitionToStandby(0); - cluster.transitionToActive(1); + cluster.transitionToStandby(1); + cluster.transitionToActive(0); cluster.stopDataNode(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStateTransitionFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStateTransitionFailure.java index bf1ca52b79d..3af78a55b98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStateTransitionFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStateTransitionFailure.java @@ -67,8 +67,8 @@ public class TestStateTransitionFailure { fail("Transitioned to active but should not have been able to."); } catch (ServiceFailedException sfe) { assertExceptionContains("Error encountered requiring NN shutdown. " + - "Shutting down immediately.", sfe); - LOG.info("got expected exception", sfe); + "Shutting down immediately.", sfe.getCause()); + LOG.info("got expected exception", sfe.getCause()); } verify(mockRuntime, times(1)).exit(anyInt()); } finally {