From 8be9441b9b13bea6e23c2cbcf638162c93052740 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Sat, 11 Feb 2012 01:20:59 +0000 Subject: [PATCH] HDFS-2878. Fix TestBlockRecovery and move it back into main test directory. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1242995 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BPOfferService.java | 3 +- .../hadoop/hdfs/server/datanode/DataNode.java | 15 +++- .../server/datanode/TestBlockRecovery.java | 68 +++++++++++++++---- 4 files changed, 72 insertions(+), 17 deletions(-) rename hadoop-hdfs-project/hadoop-hdfs/src/test/{unit => java}/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (89%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c9377e27e2c..9a5bbcbe045 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -129,6 +129,9 @@ Trunk (unreleased changes) HDFS-2486. Remove unnecessary priority level checks in UnderReplicatedBlocks. (Uma Maheswara Rao G via szetszwo) + HDFS-2878. Fix TestBlockRecovery and move it back into main test directory. + (todd) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 111fbee2852..5b1ed7c5a5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -229,8 +229,7 @@ private void checkNNVersion(NamespaceInfo nsInfo) private void connectToNNAndHandshake() throws IOException { // get NN proxy - bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr, - dn.getConf()); + bpNamenode = dn.connectToNN(nnAddr); // First phase of the handshake with NN - get the namespace // info. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b7a91696b64..031a57eaa20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1015,6 +1015,14 @@ protected Socket newSocket() throws IOException { SocketChannel.open().socket() : new Socket(); } + /** + * Connect to the NN. This is separated out for easier testing. + */ + DatanodeProtocolClientSideTranslatorPB connectToNN( + InetSocketAddress nnAddr) throws IOException { + return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf); + } + public static InterDatanodeProtocol createInterDataNodeProtocolProxy( DatanodeID datanodeid, final Configuration conf, final int socketTimeout) throws IOException { @@ -1982,8 +1990,10 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException { public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid) throws IOException { BPOfferService bpos = blockPoolManager.get(bpid); - if(bpos == null || bpos.bpNamenode == null) { - throw new IOException("cannot find a namnode proxy for bpid=" + bpid); + if (bpos == null) { + throw new IOException("No block pool offer service for bpid=" + bpid); + } else if (bpos.bpNamenode == null) { + throw new IOException("cannot find a namenode proxy for bpid=" + bpid); } return bpos.bpNamenode; } @@ -2325,5 +2335,4 @@ DNConf getDnConf() { boolean shouldRun() { return shouldRun; } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java similarity index 89% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 7dc5e86e688..cb4244132b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; @@ -39,23 +41,30 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.junit.Assert.fail; import static org.mockito.Mockito.*; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -72,6 +81,8 @@ public class TestBlockRecovery { private final static long RECOVERY_ID = 3000L; private final static String CLUSTER_ID = "testClusterID"; private final static String POOL_ID = "BP-TEST"; + private final static InetSocketAddress NN_ADDR = new InetSocketAddress( + "localhost", 5020); private final static long BLOCK_ID = 1000L; private final static long GEN_STAMP = 2000L; private final static long BLOCK_LEN = 3000L; @@ -80,9 +91,6 @@ public class TestBlockRecovery { private final static ExtendedBlock block = new ExtendedBlock(POOL_ID, BLOCK_ID, BLOCK_LEN, GEN_STAMP); - private final NamespaceInfo nsifno = - new NamespaceInfo(1,CLUSTER_ID, POOL_ID, 2, 3); - static { ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL); @@ -99,21 +107,54 @@ public void startUp() throws IOException { conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); - FileSystem.setDefaultUri(conf, "hdfs://localhost:5020"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + FileSystem.setDefaultUri(conf, + "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); ArrayList dirs = new ArrayList(); File dataDir = new File(DATA_DIR); FileUtil.fullyDelete(dataDir); dataDir.mkdirs(); dirs.add(dataDir); - DatanodeProtocol namenode = mock(DatanodeProtocol.class); + final DatanodeProtocolClientSideTranslatorPB namenode = + mock(DatanodeProtocolClientSideTranslatorPB.class); + + Mockito.doAnswer(new Answer() { + @Override + public DatanodeRegistration answer(InvocationOnMock invocation) + throws Throwable { + return (DatanodeRegistration) invocation.getArguments()[0]; + } + }).when(namenode).registerDatanode( + Mockito.any(DatanodeRegistration.class), + Mockito.any(DatanodeStorage[].class)); + when(namenode.versionRequest()).thenReturn(new NamespaceInfo (1, CLUSTER_ID, POOL_ID, 1L, 1)); - when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), - anyLong(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt())) + + when(namenode.sendHeartbeat( + Mockito.any(DatanodeRegistration.class), + Mockito.any(StorageReport[].class), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.anyInt())) .thenReturn(new DatanodeCommand[0]); - dn = new DataNode(conf, dirs, null); - - DataNodeTestUtils.setBPNamenodeByIndex(dn, nsifno, POOL_ID, namenode); + + dn = new DataNode(conf, dirs, null) { + @Override + DatanodeProtocolClientSideTranslatorPB connectToNN( + InetSocketAddress nnAddr) throws IOException { + Assert.assertEquals(NN_ADDR, nnAddr); + return namenode; + } + }; + dn.runDatanodeDaemon(); + while (!dn.isDatanodeFullyStarted()) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + fail("Interrupted starting DN"); + } + } } /** @@ -355,9 +396,11 @@ public void testRWRReplicas() throws IOException { private Collection initRecoveringBlocks() throws IOException { Collection blocks = new ArrayList(1); + DatanodeInfo mockOtherDN = new DatanodeInfo( + new DatanodeID("127.0.0.1", "storage-1234", 0, 0)); DatanodeInfo[] locs = new DatanodeInfo[] { new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())), - mock(DatanodeInfo.class) }; + mockOtherDN }; RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); blocks.add(rBlock); return blocks; @@ -495,7 +538,8 @@ public void testNotMatchedReplicaID() throws IOException { ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block); BlockWriteStreams streams = null; try { - streams = replicaInfo.createStreams(true, 0, 0); + streams = replicaInfo.createStreams(true, + DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512)); streams.checksumOut.write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); try {