diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ea2309228e2..9701d2eb7c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -113,6 +113,9 @@ Release 0.23.3 - UNRELEASED HDFS-2410. Further cleanup of hardcoded configuration keys and values. (suresh) + HDFS-2878. Fix TestBlockRecovery and move it back into main test directory. + (todd) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index ab37e494351..59c07b8bfb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -229,8 +229,7 @@ class BPOfferService implements Runnable { private void connectToNNAndHandshake() throws IOException { // get NN proxy - bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr, - dn.getConf()); + bpNamenode = dn.connectToNN(nnAddr); // First phase of the handshake with NN - get the namespace // info. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 4c21724746e..dc5c3ce7058 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -988,6 +988,14 @@ public class DataNode extends Configured SocketChannel.open().socket() : new Socket(); } + /** + * Connect to the NN. This is separated out for easier testing. + */ + DatanodeProtocolClientSideTranslatorPB connectToNN( + InetSocketAddress nnAddr) throws IOException { + return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf); + } + public static InterDatanodeProtocol createInterDataNodeProtocolProxy( DatanodeID datanodeid, final Configuration conf, final int socketTimeout) throws IOException { @@ -1936,8 +1944,10 @@ public class DataNode extends Configured public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid) throws IOException { BPOfferService bpos = blockPoolManager.get(bpid); - if(bpos == null || bpos.bpNamenode == null) { - throw new IOException("cannot find a namnode proxy for bpid=" + bpid); + if (bpos == null) { + throw new IOException("No block pool offer service for bpid=" + bpid); + } else if (bpos.bpNamenode == null) { + throw new IOException("cannot find a namenode proxy for bpid=" + bpid); } return bpos.bpNamenode; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java similarity index 89% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 7dc5e86e688..cb4244132b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; @@ -39,23 +41,30 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.junit.Assert.fail; import static org.mockito.Mockito.*; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -72,6 +81,8 @@ public class TestBlockRecovery { private final static long RECOVERY_ID = 3000L; private final static String CLUSTER_ID = "testClusterID"; private final static String POOL_ID = "BP-TEST"; + private final static InetSocketAddress NN_ADDR = new InetSocketAddress( + "localhost", 5020); private final static long BLOCK_ID = 1000L; private final static long GEN_STAMP = 2000L; private final static long BLOCK_LEN = 3000L; @@ -80,9 +91,6 @@ public class TestBlockRecovery { private final static ExtendedBlock block = new ExtendedBlock(POOL_ID, BLOCK_ID, BLOCK_LEN, GEN_STAMP); - private final NamespaceInfo nsifno = - new NamespaceInfo(1,CLUSTER_ID, POOL_ID, 2, 3); - static { ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL); @@ -99,21 +107,54 @@ public class TestBlockRecovery { conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); - FileSystem.setDefaultUri(conf, "hdfs://localhost:5020"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + FileSystem.setDefaultUri(conf, + "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); ArrayList dirs = new ArrayList(); File dataDir = new File(DATA_DIR); FileUtil.fullyDelete(dataDir); dataDir.mkdirs(); dirs.add(dataDir); - DatanodeProtocol namenode = mock(DatanodeProtocol.class); + final DatanodeProtocolClientSideTranslatorPB namenode = + mock(DatanodeProtocolClientSideTranslatorPB.class); + + Mockito.doAnswer(new Answer() { + @Override + public DatanodeRegistration answer(InvocationOnMock invocation) + throws Throwable { + return (DatanodeRegistration) invocation.getArguments()[0]; + } + }).when(namenode).registerDatanode( + Mockito.any(DatanodeRegistration.class), + Mockito.any(DatanodeStorage[].class)); + when(namenode.versionRequest()).thenReturn(new NamespaceInfo (1, CLUSTER_ID, POOL_ID, 1L, 1)); - when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), - anyLong(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt())) + + when(namenode.sendHeartbeat( + Mockito.any(DatanodeRegistration.class), + Mockito.any(StorageReport[].class), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.anyInt())) .thenReturn(new DatanodeCommand[0]); - dn = new DataNode(conf, dirs, null); - - DataNodeTestUtils.setBPNamenodeByIndex(dn, nsifno, POOL_ID, namenode); + + dn = new DataNode(conf, dirs, null) { + @Override + DatanodeProtocolClientSideTranslatorPB connectToNN( + InetSocketAddress nnAddr) throws IOException { + Assert.assertEquals(NN_ADDR, nnAddr); + return namenode; + } + }; + dn.runDatanodeDaemon(); + while (!dn.isDatanodeFullyStarted()) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + fail("Interrupted starting DN"); + } + } } /** @@ -355,9 +396,11 @@ public class TestBlockRecovery { private Collection initRecoveringBlocks() throws IOException { Collection blocks = new ArrayList(1); + DatanodeInfo mockOtherDN = new DatanodeInfo( + new DatanodeID("127.0.0.1", "storage-1234", 0, 0)); DatanodeInfo[] locs = new DatanodeInfo[] { new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())), - mock(DatanodeInfo.class) }; + mockOtherDN }; RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); blocks.add(rBlock); return blocks; @@ -495,7 +538,8 @@ public class TestBlockRecovery { ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block); BlockWriteStreams streams = null; try { - streams = replicaInfo.createStreams(true, 0, 0); + streams = replicaInfo.createStreams(true, + DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512)); streams.checksumOut.write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); try {