HDFS-3031. Fix complete() and getAdditionalBlock() RPCs to be idempotent. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1338467 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c0bacb578b
commit
b81b987be8
|
@ -545,6 +545,9 @@ Release 2.0.0 - UNRELEASED
|
||||||
HDFS-3414. Balancer does not find NameNode if rpc-address or
|
HDFS-3414. Balancer does not find NameNode if rpc-address or
|
||||||
servicerpc-address are not set in client configs. (atm)
|
servicerpc-address are not set in client configs. (atm)
|
||||||
|
|
||||||
|
HDFS-3031. Fix complete() and getAdditionalBlock() RPCs to be idempotent
|
||||||
|
(todd)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-1623 SUBTASKS
|
BREAKDOWN OF HDFS-1623 SUBTASKS
|
||||||
|
|
||||||
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
||||||
|
|
|
@ -965,6 +965,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||||
DatanodeInfo[] nodes = null;
|
DatanodeInfo[] nodes = null;
|
||||||
int count = dfsClient.getConf().nBlockWriteRetry;
|
int count = dfsClient.getConf().nBlockWriteRetry;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
ExtendedBlock oldBlock = block;
|
||||||
do {
|
do {
|
||||||
hasError = false;
|
hasError = false;
|
||||||
lastException = null;
|
lastException = null;
|
||||||
|
@ -972,9 +973,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||||
success = false;
|
success = false;
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
DatanodeInfo[] w = excludedNodes.toArray(
|
DatanodeInfo[] excluded = excludedNodes.toArray(
|
||||||
new DatanodeInfo[excludedNodes.size()]);
|
new DatanodeInfo[excludedNodes.size()]);
|
||||||
lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
|
block = oldBlock;
|
||||||
|
lb = locateFollowingBlock(startTime,
|
||||||
|
excluded.length > 0 ? excluded : null);
|
||||||
block = lb.getBlock();
|
block = lb.getBlock();
|
||||||
block.setNumBytes(0);
|
block.setNumBytes(0);
|
||||||
accessToken = lb.getBlockToken();
|
accessToken = lb.getBlockToken();
|
||||||
|
|
|
@ -215,6 +215,17 @@ public class Block implements Writable, Comparable<Block> {
|
||||||
return compareTo((Block)o) == 0;
|
return compareTo((Block)o) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the two blocks have the same block ID and the same
|
||||||
|
* generation stamp, or if both blocks are null.
|
||||||
|
*/
|
||||||
|
public static boolean matchingIdAndGenStamp(Block a, Block b) {
|
||||||
|
if (a == b) return true; // same block, or both null
|
||||||
|
if (a == null || b == null) return false; // only one null
|
||||||
|
return a.blockId == b.blockId &&
|
||||||
|
a.generationStamp == b.generationStamp;
|
||||||
|
}
|
||||||
|
|
||||||
@Override // Object
|
@Override // Object
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
//GenerationStamp is IRRELEVANT and should not be used here
|
//GenerationStamp is IRRELEVANT and should not be used here
|
||||||
|
|
|
@ -309,6 +309,7 @@ public interface ClientProtocol {
|
||||||
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
|
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
|
@Idempotent
|
||||||
public LocatedBlock addBlock(String src, String clientName,
|
public LocatedBlock addBlock(String src, String clientName,
|
||||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
|
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
|
||||||
throws AccessControlException, FileNotFoundException,
|
throws AccessControlException, FileNotFoundException,
|
||||||
|
@ -362,6 +363,7 @@ public interface ClientProtocol {
|
||||||
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
|
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
|
@Idempotent
|
||||||
public boolean complete(String src, String clientName, ExtendedBlock last)
|
public boolean complete(String src, String clientName, ExtendedBlock last)
|
||||||
throws AccessControlException, FileNotFoundException, SafeModeException,
|
throws AccessControlException, FileNotFoundException, SafeModeException,
|
||||||
UnresolvedLinkException, IOException;
|
UnresolvedLinkException, IOException;
|
||||||
|
|
|
@ -1868,6 +1868,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
QuotaExceededException, SafeModeException, UnresolvedLinkException,
|
QuotaExceededException, SafeModeException, UnresolvedLinkException,
|
||||||
IOException {
|
IOException {
|
||||||
checkBlock(previous);
|
checkBlock(previous);
|
||||||
|
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
||||||
long fileLength, blockSize;
|
long fileLength, blockSize;
|
||||||
int replication;
|
int replication;
|
||||||
DatanodeDescriptor clientNode = null;
|
DatanodeDescriptor clientNode = null;
|
||||||
|
@ -1891,9 +1892,64 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
checkFsObjectLimit();
|
checkFsObjectLimit();
|
||||||
|
|
||||||
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
||||||
|
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
||||||
|
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
||||||
|
// The block that the client claims is the current last block
|
||||||
|
// doesn't match up with what we think is the last block. There are
|
||||||
|
// three possibilities:
|
||||||
|
// 1) This is the first block allocation of an append() pipeline
|
||||||
|
// which started appending exactly at a block boundary.
|
||||||
|
// In this case, the client isn't passed the previous block,
|
||||||
|
// so it makes the allocateBlock() call with previous=null.
|
||||||
|
// We can distinguish this since the last block of the file
|
||||||
|
// will be exactly a full block.
|
||||||
|
// 2) This is a retry from a client that missed the response of a
|
||||||
|
// prior getAdditionalBlock() call, perhaps because of a network
|
||||||
|
// timeout, or because of an HA failover. In that case, we know
|
||||||
|
// by the fact that the client is re-issuing the RPC that it
|
||||||
|
// never began to write to the old block. Hence it is safe to
|
||||||
|
// abandon it and allocate a new one.
|
||||||
|
// 3) This is an entirely bogus request/bug -- we should error out
|
||||||
|
// rather than potentially appending a new block with an empty
|
||||||
|
// one in the middle, etc
|
||||||
|
|
||||||
|
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
|
||||||
|
if (previous == null &&
|
||||||
|
lastBlockInFile != null &&
|
||||||
|
lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
|
||||||
|
lastBlockInFile.isComplete()) {
|
||||||
|
// Case 1
|
||||||
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
|
NameNode.stateChangeLog.debug(
|
||||||
|
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
||||||
|
" writing to a file with a complete previous block: src=" +
|
||||||
|
src + " lastBlock=" + lastBlockInFile);
|
||||||
|
}
|
||||||
|
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
||||||
|
// Case 2
|
||||||
|
if (lastBlockInFile.getNumBytes() != 0) {
|
||||||
|
throw new IOException(
|
||||||
|
"Request looked like a retry to allocate block " +
|
||||||
|
lastBlockInFile + " but it already contains " +
|
||||||
|
lastBlockInFile.getNumBytes() + " bytes");
|
||||||
|
}
|
||||||
|
|
||||||
|
// The retry case ("b" above) -- abandon the old block.
|
||||||
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " +
|
||||||
|
"caught retry for allocation of a new block in " +
|
||||||
|
src + ". Abandoning old block " + lastBlockInFile);
|
||||||
|
dir.removeBlock(src, pendingFile, lastBlockInFile);
|
||||||
|
dir.persistBlocks(src, pendingFile);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
throw new IOException("Cannot allocate block in " + src + ": " +
|
||||||
|
"passed 'previous' block " + previous + " does not match actual " +
|
||||||
|
"last block in file " + lastBlockInFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// commit the last block and complete it if it has minimum replicas
|
// commit the last block and complete it if it has minimum replicas
|
||||||
commitOrCompleteLastBlock(pendingFile, ExtendedBlock.getLocalBlock(previous));
|
commitOrCompleteLastBlock(pendingFile, previousBlock);
|
||||||
|
|
||||||
//
|
//
|
||||||
// If we fail this, bad things happen!
|
// If we fail this, bad things happen!
|
||||||
|
@ -2104,7 +2160,29 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
throw new SafeModeException("Cannot complete file " + src, safeMode);
|
throw new SafeModeException("Cannot complete file " + src, safeMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
INodeFileUnderConstruction pendingFile = checkLease(src, holder);
|
INodeFileUnderConstruction pendingFile;
|
||||||
|
try {
|
||||||
|
pendingFile = checkLease(src, holder);
|
||||||
|
} catch (LeaseExpiredException lee) {
|
||||||
|
INodeFile file = dir.getFileINode(src);
|
||||||
|
if (file != null && !file.isUnderConstruction()) {
|
||||||
|
// This could be a retry RPC - i.e the client tried to close
|
||||||
|
// the file, but missed the RPC response. Thus, it is trying
|
||||||
|
// again to close the file. If the file still exists and
|
||||||
|
// the client's view of the last block matches the actual
|
||||||
|
// last block, then we'll treat it as a successful close.
|
||||||
|
// See HDFS-3031.
|
||||||
|
Block realLastBlock = file.getLastBlock();
|
||||||
|
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
|
||||||
|
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " +
|
||||||
|
"received request from " + holder + " to complete file " + src +
|
||||||
|
" which is already closed. But, it appears to be an RPC " +
|
||||||
|
"retry. Returning success.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw lee;
|
||||||
|
}
|
||||||
// commit the last block and complete it if it has minimum replicas
|
// commit the last block and complete it if it has minimum replicas
|
||||||
commitOrCompleteLastBlock(pendingFile, last);
|
commitOrCompleteLastBlock(pendingFile, last);
|
||||||
|
|
||||||
|
|
|
@ -1658,19 +1658,14 @@ public class MiniDFSCluster {
|
||||||
return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
|
return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private HAServiceProtocol getHaServiceClient(int nnIndex) throws IOException {
|
|
||||||
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
|
|
||||||
return new HAServiceProtocolClientSideTranslatorPB(addr, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void transitionToActive(int nnIndex) throws IOException,
|
public void transitionToActive(int nnIndex) throws IOException,
|
||||||
ServiceFailedException {
|
ServiceFailedException {
|
||||||
HAServiceProtocolHelper.transitionToActive(getHaServiceClient(nnIndex));
|
getNameNode(nnIndex).getRpcServer().transitionToActive();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void transitionToStandby(int nnIndex) throws IOException,
|
public void transitionToStandby(int nnIndex) throws IOException,
|
||||||
ServiceFailedException {
|
ServiceFailedException {
|
||||||
HAServiceProtocolHelper.transitionToStandby(getHaServiceClient(nnIndex));
|
getNameNode(nnIndex).getRpcServer().transitionToStandby();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -63,10 +63,13 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mockito.internal.stubbing.answers.ThrowsException;
|
import org.mockito.internal.stubbing.answers.ThrowsException;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* These tests make sure that DFSClient retries fetching data from DFS
|
* These tests make sure that DFSClient retries fetching data from DFS
|
||||||
* properly in case of errors.
|
* properly in case of errors.
|
||||||
|
@ -297,6 +300,100 @@ public class TestDFSClientRetries extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that getAdditionalBlock() and close() are idempotent. This allows
|
||||||
|
* a client to safely retry a call and still produce a correct
|
||||||
|
* file. See HDFS-3031.
|
||||||
|
*/
|
||||||
|
public void testIdempotentAllocateBlockAndClose() throws Exception {
|
||||||
|
final String src = "/testIdempotentAllocateBlock";
|
||||||
|
Path file = new Path(src);
|
||||||
|
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
|
||||||
|
NamenodeProtocols spyNN = spy(preSpyNN);
|
||||||
|
DFSClient client = new DFSClient(null, spyNN, conf, null);
|
||||||
|
|
||||||
|
|
||||||
|
// Make the call to addBlock() get called twice, as if it were retried
|
||||||
|
// due to an IPC issue.
|
||||||
|
doAnswer(new Answer<LocatedBlock>() {
|
||||||
|
@Override
|
||||||
|
public LocatedBlock answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
LocatedBlock ret = (LocatedBlock) invocation.callRealMethod();
|
||||||
|
LocatedBlocks lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||||
|
int blockCount = lb.getLocatedBlocks().size();
|
||||||
|
assertEquals(lb.getLastLocatedBlock().getBlock(), ret.getBlock());
|
||||||
|
|
||||||
|
// Retrying should result in a new block at the end of the file.
|
||||||
|
// (abandoning the old one)
|
||||||
|
LocatedBlock ret2 = (LocatedBlock) invocation.callRealMethod();
|
||||||
|
lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||||
|
int blockCount2 = lb.getLocatedBlocks().size();
|
||||||
|
assertEquals(lb.getLastLocatedBlock().getBlock(), ret2.getBlock());
|
||||||
|
|
||||||
|
// We shouldn't have gained an extra block by the RPC.
|
||||||
|
assertEquals(blockCount, blockCount2);
|
||||||
|
return (LocatedBlock) ret2;
|
||||||
|
}
|
||||||
|
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
|
||||||
|
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
|
||||||
|
|
||||||
|
doAnswer(new Answer<Boolean>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
// complete() may return false a few times before it returns
|
||||||
|
// true. We want to wait until it returns true, and then
|
||||||
|
// make it retry one more time after that.
|
||||||
|
LOG.info("Called complete(: " +
|
||||||
|
Joiner.on(",").join(invocation.getArguments()) + ")");
|
||||||
|
if (!(Boolean)invocation.callRealMethod()) {
|
||||||
|
LOG.info("Complete call returned false, not faking a retry RPC");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// We got a successful close. Call it again to check idempotence.
|
||||||
|
try {
|
||||||
|
boolean ret = (Boolean) invocation.callRealMethod();
|
||||||
|
LOG.info("Complete call returned true, faked second RPC. " +
|
||||||
|
"Returned: " + ret);
|
||||||
|
return ret;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Idempotent retry threw exception", t);
|
||||||
|
throw t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).when(spyNN).complete(Mockito.anyString(), Mockito.anyString(),
|
||||||
|
Mockito.<ExtendedBlock>any());
|
||||||
|
|
||||||
|
OutputStream stm = client.create(file.toString(), true);
|
||||||
|
try {
|
||||||
|
AppendTestUtil.write(stm, 0, 10000);
|
||||||
|
stm.close();
|
||||||
|
stm = null;
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, stm);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the mock was actually properly injected.
|
||||||
|
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
|
||||||
|
Mockito.anyString(), Mockito.anyString(),
|
||||||
|
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
|
||||||
|
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
|
||||||
|
Mockito.anyString(), Mockito.anyString(),
|
||||||
|
Mockito.<ExtendedBlock>any());
|
||||||
|
|
||||||
|
AppendTestUtil.check(fs, file, 10000);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mock Answer implementation of NN.getBlockLocations that will return
|
* Mock Answer implementation of NN.getBlockLocations that will return
|
||||||
* a poisoned block list a certain number of times before returning
|
* a poisoned block list a certain number of times before returning
|
||||||
|
|
|
@ -21,10 +21,6 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
|
|
||||||
import junit.extensions.TestSetup;
|
|
||||||
import junit.framework.Test;
|
|
||||||
import junit.framework.TestSuite;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -43,9 +39,13 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/** This class implements some of tests posted in HADOOP-2658. */
|
/** This class implements some of tests posted in HADOOP-2658. */
|
||||||
public class TestFileAppend3 extends junit.framework.TestCase {
|
public class TestFileAppend3 {
|
||||||
{
|
{
|
||||||
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
||||||
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
@ -64,9 +64,8 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
private static MiniDFSCluster cluster;
|
private static MiniDFSCluster cluster;
|
||||||
private static DistributedFileSystem fs;
|
private static DistributedFileSystem fs;
|
||||||
|
|
||||||
public static Test suite() {
|
@BeforeClass
|
||||||
return new TestSetup(new TestSuite(TestFileAppend3.class)) {
|
public static void setUp() throws java.lang.Exception {
|
||||||
protected void setUp() throws java.lang.Exception {
|
|
||||||
AppendTestUtil.LOG.info("setUp()");
|
AppendTestUtil.LOG.info("setUp()");
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
|
||||||
|
@ -75,18 +74,18 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
fs = (DistributedFileSystem)cluster.getFileSystem();
|
fs = (DistributedFileSystem)cluster.getFileSystem();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void tearDown() throws Exception {
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
AppendTestUtil.LOG.info("tearDown()");
|
AppendTestUtil.LOG.info("tearDown()");
|
||||||
if(fs != null) fs.close();
|
if(fs != null) fs.close();
|
||||||
if(cluster != null) cluster.shutdown();
|
if(cluster != null) cluster.shutdown();
|
||||||
}
|
}
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TC1: Append on block boundary.
|
* TC1: Append on block boundary.
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTC1() throws Exception {
|
public void testTC1() throws Exception {
|
||||||
final Path p = new Path("/TC1/foo");
|
final Path p = new Path("/TC1/foo");
|
||||||
System.out.println("p=" + p);
|
System.out.println("p=" + p);
|
||||||
|
@ -115,6 +114,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
* TC2: Append on non-block boundary.
|
* TC2: Append on non-block boundary.
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTC2() throws Exception {
|
public void testTC2() throws Exception {
|
||||||
final Path p = new Path("/TC2/foo");
|
final Path p = new Path("/TC2/foo");
|
||||||
System.out.println("p=" + p);
|
System.out.println("p=" + p);
|
||||||
|
@ -145,6 +145,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
* TC5: Only one simultaneous append.
|
* TC5: Only one simultaneous append.
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTC5() throws Exception {
|
public void testTC5() throws Exception {
|
||||||
final Path p = new Path("/TC5/foo");
|
final Path p = new Path("/TC5/foo");
|
||||||
System.out.println("p=" + p);
|
System.out.println("p=" + p);
|
||||||
|
@ -175,6 +176,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
* TC7: Corrupted replicas are present.
|
* TC7: Corrupted replicas are present.
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTC7() throws Exception {
|
public void testTC7() throws Exception {
|
||||||
final short repl = 2;
|
final short repl = 2;
|
||||||
final Path p = new Path("/TC7/foo");
|
final Path p = new Path("/TC7/foo");
|
||||||
|
@ -224,6 +226,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
* TC11: Racing rename
|
* TC11: Racing rename
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTC11() throws Exception {
|
public void testTC11() throws Exception {
|
||||||
final Path p = new Path("/TC11/foo");
|
final Path p = new Path("/TC11/foo");
|
||||||
System.out.println("p=" + p);
|
System.out.println("p=" + p);
|
||||||
|
@ -282,6 +285,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
* TC12: Append to partial CRC chunk
|
* TC12: Append to partial CRC chunk
|
||||||
* @throws IOException an exception might be thrown
|
* @throws IOException an exception might be thrown
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testTC12() throws Exception {
|
public void testTC12() throws Exception {
|
||||||
final Path p = new Path("/TC12/foo");
|
final Path p = new Path("/TC12/foo");
|
||||||
System.out.println("p=" + p);
|
System.out.println("p=" + p);
|
||||||
|
@ -313,6 +317,7 @@ public class TestFileAppend3 extends junit.framework.TestCase {
|
||||||
* *
|
* *
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testAppendToPartialChunk() throws IOException {
|
public void testAppendToPartialChunk() throws IOException {
|
||||||
final Path p = new Path("/partialChunk/foo");
|
final Path p = new Path("/partialChunk/foo");
|
||||||
final int fileLen = 513;
|
final int fileLen = 513;
|
||||||
|
|
|
@ -86,11 +86,51 @@ public class TestPipelinesFailover {
|
||||||
private static final int STRESS_NUM_THREADS = 25;
|
private static final int STRESS_NUM_THREADS = 25;
|
||||||
private static final int STRESS_RUNTIME = 40000;
|
private static final int STRESS_RUNTIME = 40000;
|
||||||
|
|
||||||
|
enum TestScenario {
|
||||||
|
GRACEFUL_FAILOVER {
|
||||||
|
void run(MiniDFSCluster cluster) throws IOException {
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ORIGINAL_ACTIVE_CRASHED {
|
||||||
|
void run(MiniDFSCluster cluster) throws IOException {
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
abstract void run(MiniDFSCluster cluster) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum MethodToTestIdempotence {
|
||||||
|
ALLOCATE_BLOCK,
|
||||||
|
COMPLETE_FILE;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests continuing a write pipeline over a failover.
|
* Tests continuing a write pipeline over a failover.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testWriteOverFailover() throws Exception {
|
public void testWriteOverGracefulFailover() throws Exception {
|
||||||
|
doWriteOverFailoverTest(TestScenario.GRACEFUL_FAILOVER,
|
||||||
|
MethodToTestIdempotence.ALLOCATE_BLOCK);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testAllocateBlockAfterCrashFailover() throws Exception {
|
||||||
|
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
|
||||||
|
MethodToTestIdempotence.ALLOCATE_BLOCK);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testCompleteFileAfterCrashFailover() throws Exception {
|
||||||
|
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
|
||||||
|
MethodToTestIdempotence.COMPLETE_FILE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doWriteOverFailoverTest(TestScenario scenario,
|
||||||
|
MethodToTestIdempotence methodToTest) throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
// Don't check replication periodically.
|
// Don't check replication periodically.
|
||||||
|
@ -102,6 +142,8 @@ public class TestPipelinesFailover {
|
||||||
.numDataNodes(3)
|
.numDataNodes(3)
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
|
int sizeWritten = 0;
|
||||||
|
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
cluster.transitionToActive(0);
|
cluster.transitionToActive(0);
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
|
@ -112,28 +154,39 @@ public class TestPipelinesFailover {
|
||||||
|
|
||||||
// write a block and a half
|
// write a block and a half
|
||||||
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
|
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
|
||||||
|
sizeWritten += BLOCK_AND_A_HALF;
|
||||||
|
|
||||||
// Make sure all of the blocks are written out before failover.
|
// Make sure all of the blocks are written out before failover.
|
||||||
stm.hflush();
|
stm.hflush();
|
||||||
|
|
||||||
LOG.info("Failing over to NN 1");
|
LOG.info("Failing over to NN 1");
|
||||||
cluster.transitionToStandby(0);
|
scenario.run(cluster);
|
||||||
cluster.transitionToActive(1);
|
|
||||||
|
// NOTE: explicitly do *not* make any further metadata calls
|
||||||
|
// to the NN here. The next IPC call should be to allocate the next
|
||||||
|
// block. Any other call would notice the failover and not test
|
||||||
|
// idempotence of the operation (HDFS-3031)
|
||||||
|
|
||||||
assertTrue(fs.exists(TEST_PATH));
|
|
||||||
FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
|
FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
|
||||||
BlockManagerTestUtil.updateState(ns1.getBlockManager());
|
BlockManagerTestUtil.updateState(ns1.getBlockManager());
|
||||||
assertEquals(0, ns1.getPendingReplicationBlocks());
|
assertEquals(0, ns1.getPendingReplicationBlocks());
|
||||||
assertEquals(0, ns1.getCorruptReplicaBlocks());
|
assertEquals(0, ns1.getCorruptReplicaBlocks());
|
||||||
assertEquals(0, ns1.getMissingBlocksCount());
|
assertEquals(0, ns1.getMissingBlocksCount());
|
||||||
|
|
||||||
|
// If we're testing allocateBlock()'s idempotence, write another
|
||||||
|
// block and a half, so we have to allocate a new block.
|
||||||
|
// Otherise, don't write anything, so our next RPC will be
|
||||||
|
// completeFile() if we're testing idempotence of that operation.
|
||||||
|
if (methodToTest == MethodToTestIdempotence.ALLOCATE_BLOCK) {
|
||||||
// write another block and a half
|
// write another block and a half
|
||||||
AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
|
AppendTestUtil.write(stm, sizeWritten, BLOCK_AND_A_HALF);
|
||||||
|
sizeWritten += BLOCK_AND_A_HALF;
|
||||||
|
}
|
||||||
|
|
||||||
stm.close();
|
stm.close();
|
||||||
stm = null;
|
stm = null;
|
||||||
|
|
||||||
AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE * 3);
|
AppendTestUtil.check(fs, TEST_PATH, sizeWritten);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(stm);
|
IOUtils.closeStream(stm);
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -146,7 +199,18 @@ public class TestPipelinesFailover {
|
||||||
* even when the pipeline was constructed on a different NN.
|
* even when the pipeline was constructed on a different NN.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testWriteOverFailoverWithDnFail() throws Exception {
|
public void testWriteOverGracefulFailoverWithDnFail() throws Exception {
|
||||||
|
doTestWriteOverFailoverWithDnFail(TestScenario.GRACEFUL_FAILOVER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testWriteOverCrashFailoverWithDnFail() throws Exception {
|
||||||
|
doTestWriteOverFailoverWithDnFail(TestScenario.ORIGINAL_ACTIVE_CRASHED);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void doTestWriteOverFailoverWithDnFail(TestScenario scenario)
|
||||||
|
throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
|
||||||
|
@ -171,8 +235,7 @@ public class TestPipelinesFailover {
|
||||||
stm.hflush();
|
stm.hflush();
|
||||||
|
|
||||||
LOG.info("Failing over to NN 1");
|
LOG.info("Failing over to NN 1");
|
||||||
cluster.transitionToStandby(0);
|
scenario.run(cluster);
|
||||||
cluster.transitionToActive(1);
|
|
||||||
|
|
||||||
assertTrue(fs.exists(TEST_PATH));
|
assertTrue(fs.exists(TEST_PATH));
|
||||||
|
|
||||||
|
@ -183,8 +246,8 @@ public class TestPipelinesFailover {
|
||||||
stm.hflush();
|
stm.hflush();
|
||||||
|
|
||||||
LOG.info("Failing back to NN 0");
|
LOG.info("Failing back to NN 0");
|
||||||
cluster.transitionToStandby(0);
|
cluster.transitionToStandby(1);
|
||||||
cluster.transitionToActive(1);
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
cluster.stopDataNode(1);
|
cluster.stopDataNode(1);
|
||||||
|
|
||||||
|
|
|
@ -67,8 +67,8 @@ public class TestStateTransitionFailure {
|
||||||
fail("Transitioned to active but should not have been able to.");
|
fail("Transitioned to active but should not have been able to.");
|
||||||
} catch (ServiceFailedException sfe) {
|
} catch (ServiceFailedException sfe) {
|
||||||
assertExceptionContains("Error encountered requiring NN shutdown. " +
|
assertExceptionContains("Error encountered requiring NN shutdown. " +
|
||||||
"Shutting down immediately.", sfe);
|
"Shutting down immediately.", sfe.getCause());
|
||||||
LOG.info("got expected exception", sfe);
|
LOG.info("got expected exception", sfe.getCause());
|
||||||
}
|
}
|
||||||
verify(mockRuntime, times(1)).exit(anyInt());
|
verify(mockRuntime, times(1)).exit(anyInt());
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue