From 2b5fd341b969a6c920c27e06b158a5b608544f17 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 6 Apr 2021 00:19:21 +0530 Subject: [PATCH] HDFS-15940. Fixing and refactoring tests specific to Block recovery.(#2844). Contributed by Viraj Jasani Signed-off-by: Ayush Saxena Signed-off-by: Takanobu Asanuma --- .../server/datanode/TestBlockRecovery.java | 413 +++------------- .../server/datanode/TestBlockRecovery2.java | 463 ++++++++++++++++++ .../datanode/TestDataNodeReconfiguration.java | 4 - 3 files changed, 538 insertions(+), 342 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index ee522725e28..995a135c4e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -19,21 +19,14 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.hadoop.hdfs.AppendTestUtil; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -44,16 +37,13 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -78,13 +68,11 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StripedFileTestUtil; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -99,11 +87,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.GenericTestUtils.SleepAnswer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.slf4j.event.Level; @@ -298,13 +284,13 @@ public class TestBlockRecovery { } } - /** Sync two replicas */ - private void testSyncReplicas(ReplicaRecoveryInfo replica1, - ReplicaRecoveryInfo replica2, - InterDatanodeProtocol dn1, - InterDatanodeProtocol dn2, - long expectLen) throws IOException { - + /** + * Sync two replicas. + */ + private void testSyncReplicas(ReplicaRecoveryInfo replica1, + ReplicaRecoveryInfo replica2, InterDatanodeProtocol dn1, + InterDatanodeProtocol dn2) throws IOException { + DatanodeInfo[] locs = new DatanodeInfo[]{ mock(DatanodeInfo.class), mock(DatanodeInfo.class)}; RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); @@ -315,7 +301,7 @@ public class TestBlockRecovery { DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2); syncList.add(record1); syncList.add(record2); - + when(dn1.updateReplicaUnderRecovery(any(ExtendedBlock.class), anyLong(), anyLong(), anyLong())).thenReturn("storage1"); when(dn2.updateReplicaUnderRecovery(any(ExtendedBlock.class), anyLong(), @@ -325,7 +311,7 @@ public class TestBlockRecovery { recoveryWorker.new RecoveryTaskContiguous(rBlock); RecoveryTaskContiguous.syncBlock(syncList); } - + /** * BlockRecovery_02.8. * Two replicas are in Finalized state @@ -336,38 +322,38 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED); + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED); + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 2, ReplicaState.FINALIZED); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); - testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); + testSyncReplicas(replica1, replica2, dn1, dn2); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); // two finalized replicas have different length - replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED); + replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED); + replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.FINALIZED); try { - testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); + testSyncReplicas(replica1, replica2, dn1, dn2); Assert.fail("Two finalized replicas should not have different lengthes!"); } catch (IOException e) { Assert.assertTrue(e.getMessage().startsWith( "Inconsistent size of finalized replicas. ")); } } - + /** * BlockRecovery_02.9. - * One replica is Finalized and another is RBW. + * One replica is Finalized and another is RBW. * @throws IOException in case of an error */ @Test(timeout=60000) @@ -375,80 +361,81 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - + // rbw and finalized replicas have the same length - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); - testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); + testSyncReplicas(replica1, replica2, dn1, dn2); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); - + // rbw replica has a different length from the finalized one - replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); + replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED); + replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RBW); dn1 = mock(InterDatanodeProtocol.class); dn2 = mock(InterDatanodeProtocol.class); - testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); + testSyncReplicas(replica1, replica2, dn1, dn2); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } - + /** * BlockRecovery_02.10. - * One replica is Finalized and another is RWR. + * One replica is Finalized and another is RWR. + * * @throws IOException in case of an error */ - @Test(timeout=60000) + @Test(timeout = 60000) public void testFinalizedRwrReplicas() throws IOException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - + // rbw and finalized replicas have the same length - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED); + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 2, ReplicaState.RWR); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); - testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); + testSyncReplicas(replica1, replica2, dn1, dn2); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); - + // rbw replica has a different length from the finalized one - replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); - replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); + replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.FINALIZED); + replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RBW); dn1 = mock(InterDatanodeProtocol.class); dn2 = mock(InterDatanodeProtocol.class); - testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); + testSyncReplicas(replica1, replica2, dn1, dn2); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } - + /** * BlockRecovery_02.11. * Two replicas are RBW. @@ -456,26 +443,27 @@ public class TestBlockRecovery { */ @Test(timeout=60000) public void testRBWReplicas() throws IOException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.RBW); + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RBW); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); - testSyncReplicas(replica1, replica2, dn1, dn2, minLen); + testSyncReplicas(replica1, replica2, dn1, dn2); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); } - + /** * BlockRecovery_02.12. - * One replica is RBW and another is RWR. + * One replica is RBW and another is RWR. + * * @throws IOException in case of an error */ @Test(timeout=60000) @@ -483,44 +471,45 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.RBW); + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 2, ReplicaState.RWR); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); - testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); + testSyncReplicas(replica1, replica2, dn1, dn2); verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } - + /** - * BlockRecovery_02.13. + * BlockRecovery_02.13. * Two replicas are RWR. + * * @throws IOException in case of an error */ @Test(timeout=60000) public void testRWRReplicas() throws IOException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR); - ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, - REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR); + ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN1, GEN_STAMP - 1, ReplicaState.RWR); + ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, + REPLICA_LEN2, GEN_STAMP - 2, ReplicaState.RWR); InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); - testSyncReplicas(replica1, replica2, dn1, dn2, minLen); - + testSyncReplicas(replica1, replica2, dn1, dn2); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); - } + } private Collection initRecoveringBlocks() throws IOException { Collection blocks = new ArrayList(1); @@ -708,132 +697,6 @@ public class TestBlockRecovery { } } - @Test(timeout = 60000) - public void testEcRecoverBlocks() throws Throwable { - // Stop the Mocked DN started in startup() - tearDown(); - ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(8).build(); - - try { - cluster.waitActive(); - NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); - NamenodeProtocols spyNN = spy(preSpyNN); - - // Delay completeFile - GenericTestUtils.DelayAnswer delayer = - new GenericTestUtils.DelayAnswer(LOG); - doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(), - anyLong()); - String topDir = "/myDir"; - DFSClient client = new DFSClient(null, spyNN, conf, null); - Path file = new Path(topDir + "/testECLeaseRecover"); - client.mkdirs(topDir, null, false); - client.enableErasureCodingPolicy(ecPolicy.getName()); - client.setErasureCodingPolicy(topDir, ecPolicy.getName()); - OutputStream stm = client.create(file.toString(), true); - - // write 5MB File - AppendTestUtil.write(stm, 0, 1024 * 1024 * 5); - final AtomicReference err = new AtomicReference(); - Thread t = new Thread() { - @Override - public void run() { - try { - stm.close(); - } catch (Throwable t) { - err.set(t); - } - } - }; - t.start(); - - // Waiting for close to get to latch - delayer.waitForCall(); - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - return client.getNamenode().recoverLease(file.toString(), - client.getClientName()); - } catch (IOException e) { - return false; - } - } - }, 5000, 24000); - delayer.proceed(); - } finally { - cluster.shutdown(); - } - } - - /** - * Test to verify the race between finalizeBlock and Lease recovery - * - * @throws Exception - */ - @Test(timeout = 20000) - public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception { - tearDown();// Stop the Mocked DN started in startup() - - Configuration conf = new HdfsConfiguration(); - conf.set(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000"); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); - try { - cluster.waitClusterUp(); - DistributedFileSystem fs = cluster.getFileSystem(); - Path path = new Path("/test"); - FSDataOutputStream out = fs.create(path); - out.writeBytes("data"); - out.hsync(); - - List blocks = DFSTestUtil.getAllBlocks(fs.open(path)); - final LocatedBlock block = blocks.get(0); - final DataNode dataNode = cluster.getDataNodes().get(0); - - final AtomicBoolean recoveryInitResult = new AtomicBoolean(true); - Thread recoveryThread = new Thread() { - @Override - public void run() { - try { - DatanodeInfo[] locations = block.getLocations(); - final RecoveringBlock recoveringBlock = new RecoveringBlock( - block.getBlock(), locations, block.getBlock() - .getGenerationStamp() + 1); - try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { - Thread.sleep(2000); - dataNode.initReplicaRecovery(recoveringBlock); - } - } catch (Exception e) { - recoveryInitResult.set(false); - } - } - }; - recoveryThread.start(); - try { - out.close(); - } catch (IOException e) { - Assert.assertTrue("Writing should fail", - e.getMessage().contains("are bad. Aborting...")); - } finally { - recoveryThread.join(); - } - Assert.assertTrue("Recovery should be initiated successfully", - recoveryInitResult.get()); - - dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() - .getGenerationStamp() + 1, block.getBlock().getBlockId(), - block.getBlockSize()); - } finally { - if (null != cluster) { - cluster.shutdown(); - cluster = null; - } - } - } - /** * DNs report RUR instead of RBW, RWR or FINALIZED. Primary DN expected to * throw an exception. @@ -1107,57 +970,7 @@ public class TestBlockRecovery { } } - /** - * Test for block recovery taking longer than the heartbeat interval. - */ - @Test(timeout = 300000L) - public void testRecoverySlowerThanHeartbeat() throws Exception { - tearDown(); // Stop the Mocked DN started in startup() - - SleepAnswer delayer = new SleepAnswer(3000, 6000); - testRecoveryWithDatanodeDelayed(delayer); - } - - /** - * Test for block recovery timeout. All recovery attempts will be delayed - * and the first attempt will be lost to trigger recovery timeout and retry. - */ - @Test(timeout = 300000L) - public void testRecoveryTimeout() throws Exception { - tearDown(); // Stop the Mocked DN started in startup() - final Random r = new Random(); - - // Make sure first commitBlockSynchronization call from the DN gets lost - // for the recovery timeout to expire and new recovery attempt - // to be started. - SleepAnswer delayer = new SleepAnswer(3000) { - private final AtomicBoolean callRealMethod = new AtomicBoolean(); - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - boolean interrupted = false; - try { - Thread.sleep(r.nextInt(3000) + 6000); - } catch (InterruptedException ie) { - interrupted = true; - } - try { - if (callRealMethod.get()) { - return invocation.callRealMethod(); - } - callRealMethod.set(true); - return null; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - }; - testRecoveryWithDatanodeDelayed(delayer); - } - - private void testRecoveryWithDatanodeDelayed( + static void testRecoveryWithDatanodeDelayed( GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception { Configuration configuration = new HdfsConfiguration(); configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); @@ -1209,80 +1022,4 @@ public class TestBlockRecovery { } } - /** - * Test that block will be recovered even if there are less than the - * specified minReplication datanodes involved in its recovery. - * - * Check that, after recovering, the block will be successfully replicated. - */ - @Test(timeout = 300000L) - public void testRecoveryWillIgnoreMinReplication() throws Exception { - tearDown(); // Stop the Mocked DN started in startup() - - final int blockSize = 4096; - final int numReplicas = 3; - final String filename = "/testIgnoreMinReplication"; - final Path filePath = new Path(filename); - Configuration configuration = new HdfsConfiguration(); - configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); - configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2); - configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize); - MiniDFSCluster cluster = null; - - try { - cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5) - .build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final FSNamesystem fsn = cluster.getNamesystem(); - - // Create a file and never close the output stream to trigger recovery - FSDataOutputStream out = dfs.create(filePath, (short) numReplicas); - out.write(AppendTestUtil.randomBytes(0, blockSize)); - out.hsync(); - - DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), configuration); - LocatedBlock blk = dfsClient.getNamenode(). - getBlockLocations(filename, 0, blockSize). - getLastLocatedBlock(); - - // Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication - List dataNodes = Arrays.asList(blk.getLocations()); - assertEquals(dataNodes.size(), numReplicas); - for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) { - cluster.stopDataNode(dataNode.getName()); - } - - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - return fsn.getNumDeadDataNodes() == 2; - } - }, 300, 300000); - - // Make sure hard lease expires to trigger replica recovery - cluster.setLeasePeriod(100L, 100L); - - // Wait for recovery to succeed - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - return dfs.isFileClosed(filePath); - } catch (IOException e) {} - return false; - } - }, 300, 300000); - - // Wait for the block to be replicated - DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock( - dfs, filePath), 1, numReplicas, 0); - - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java new file mode 100644 index 00000000000..03d5851f232 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.AppendTestUtil; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Test part 2 for sync all replicas in block recovery. + */ +public class TestBlockRecovery2 { + + private static final Logger LOG = + LoggerFactory.getLogger(TestBlockRecovery2.class); + + private static final String DATA_DIR = + MiniDFSCluster.getBaseDirectory() + "data"; + + private DataNode dn; + private Configuration conf; + private boolean tearDownDone; + + private final static String CLUSTER_ID = "testClusterID"; + private final static String POOL_ID = "BP-TEST"; + private final static InetSocketAddress NN_ADDR = new InetSocketAddress( + "localhost", 5020); + + @Rule + public TestName currentTestName = new TestName(); + + static { + GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.TRACE); + GenericTestUtils.setLogLevel(LOG, Level.TRACE); + } + + /** + * Starts an instance of DataNode. + * @throws IOException + */ + @Before + public void startUp() throws IOException { + tearDownDone = false; + conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); + conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + FileSystem.setDefaultUri(conf, + "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); + List locations = new ArrayList<>(); + File dataDir = new File(DATA_DIR); + FileUtil.fullyDelete(dataDir); + dataDir.mkdirs(); + StorageLocation location = StorageLocation.parse(dataDir.getPath()); + locations.add(location); + final DatanodeProtocolClientSideTranslatorPB namenode = + mock(DatanodeProtocolClientSideTranslatorPB.class); + + Mockito.doAnswer( + (Answer) invocation -> + (DatanodeRegistration) invocation.getArguments()[0]) + .when(namenode) + .registerDatanode(Mockito.any(DatanodeRegistration.class)); + + when(namenode.versionRequest()) + .thenReturn(new NamespaceInfo(1, CLUSTER_ID, POOL_ID, 1L)); + + when(namenode.sendHeartbeat( + Mockito.any(), + Mockito.any(), + Mockito.anyLong(), + Mockito.anyLong(), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.any(), + Mockito.anyBoolean(), + Mockito.any(), + Mockito.any())) + .thenReturn(new HeartbeatResponse( + new DatanodeCommand[0], + new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1), + null, ThreadLocalRandom.current().nextLong() | 1L)); + + dn = new DataNode(conf, locations, null, null) { + @Override + DatanodeProtocolClientSideTranslatorPB connectToNN( + InetSocketAddress nnAddr) throws IOException { + Assert.assertEquals(NN_ADDR, nnAddr); + return namenode; + } + }; + // Trigger a heartbeat so that it acknowledges the NN as active. + dn.getAllBpOs().get(0).triggerHeartbeatForTests(); + waitForActiveNN(); + } + + /** + * Wait for active NN up to 15 seconds. + */ + private void waitForActiveNN() { + try { + GenericTestUtils.waitFor(() -> + dn.getAllBpOs().get(0).getActiveNN() != null, 1000, 15 * 1000); + } catch (TimeoutException e) { + // Here its not failing, will again do the assertions for activeNN after + // this waiting period and fails there if BPOS has not acknowledged + // any NN as active. + LOG.warn("Failed to get active NN", e); + } catch (InterruptedException e) { + LOG.warn("InterruptedException while waiting to see active NN", e); + } + Assert.assertNotNull("Failed to get ActiveNN", + dn.getAllBpOs().get(0).getActiveNN()); + } + + /** + * Cleans the resources and closes the instance of datanode. + * @throws IOException if an error occurred + */ + @After + public void tearDown() throws IOException { + if (!tearDownDone && dn != null) { + try { + dn.shutdown(); + } catch(Exception e) { + LOG.error("Cannot close: ", e); + } finally { + File dir = new File(DATA_DIR); + if (dir.exists()) { + Assert.assertTrue( + "Cannot delete data-node dirs", FileUtil.fullyDelete(dir)); + } + } + tearDownDone = true; + } + } + + /** + * Test to verify the race between finalizeBlock and Lease recovery. + * + * @throws Exception + */ + @Test(timeout = 20000) + public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() + throws Exception { + // Stop the Mocked DN started in startup() + tearDown(); + + Configuration configuration = new HdfsConfiguration(); + configuration.set( + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000"); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(configuration) + .numDataNodes(1).build(); + try { + cluster.waitClusterUp(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path path = new Path("/test"); + FSDataOutputStream out = fs.create(path); + out.writeBytes("data"); + out.hsync(); + + List blocks = DFSTestUtil.getAllBlocks(fs.open(path)); + final LocatedBlock block = blocks.get(0); + final DataNode dataNode = cluster.getDataNodes().get(0); + + final AtomicBoolean recoveryInitResult = new AtomicBoolean(true); + Thread recoveryThread = new Thread(() -> { + try { + DatanodeInfo[] locations = block.getLocations(); + final BlockRecoveryCommand.RecoveringBlock recoveringBlock = + new BlockRecoveryCommand.RecoveringBlock(block.getBlock(), + locations, block.getBlock().getGenerationStamp() + 1); + try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { + Thread.sleep(2000); + dataNode.initReplicaRecovery(recoveringBlock); + } + } catch (Exception e) { + recoveryInitResult.set(false); + } + }); + recoveryThread.start(); + try { + out.close(); + } catch (IOException e) { + Assert.assertTrue("Writing should fail", + e.getMessage().contains("are bad. Aborting...")); + } finally { + recoveryThread.join(); + } + Assert.assertTrue("Recovery should be initiated successfully", + recoveryInitResult.get()); + + dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() + .getGenerationStamp() + 1, block.getBlock().getBlockId(), + block.getBlockSize()); + } finally { + if (null != cluster) { + cluster.shutdown(); + } + } + } + + /** + * Test for block recovery timeout. All recovery attempts will be delayed + * and the first attempt will be lost to trigger recovery timeout and retry. + */ + @Test(timeout = 300000L) + public void testRecoveryTimeout() throws Exception { + tearDown(); // Stop the Mocked DN started in startup() + final Random r = new Random(); + + // Make sure first commitBlockSynchronization call from the DN gets lost + // for the recovery timeout to expire and new recovery attempt + // to be started. + GenericTestUtils.SleepAnswer delayer = + new GenericTestUtils.SleepAnswer(3000) { + private final AtomicBoolean callRealMethod = new AtomicBoolean(); + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + boolean interrupted = false; + try { + Thread.sleep(r.nextInt(3000) + 6000); + } catch (InterruptedException ie) { + interrupted = true; + } + try { + if (callRealMethod.get()) { + return invocation.callRealMethod(); + } + callRealMethod.set(true); + return null; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + }; + TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer); + } + + /** + * Test for block recovery taking longer than the heartbeat interval. + */ + @Test(timeout = 300000L) + public void testRecoverySlowerThanHeartbeat() throws Exception { + tearDown(); // Stop the Mocked DN started in startup() + + GenericTestUtils.SleepAnswer delayer = + new GenericTestUtils.SleepAnswer(3000, 6000); + TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer); + } + + @Test(timeout = 60000) + public void testEcRecoverBlocks() throws Throwable { + // Stop the Mocked DN started in startup() + tearDown(); + ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(8).build(); + + try { + cluster.waitActive(); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); + + // Delay completeFile + GenericTestUtils.DelayAnswer delayer = + new GenericTestUtils.DelayAnswer(LOG); + doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(), + anyLong()); + String topDir = "/myDir"; + DFSClient client = new DFSClient(null, spyNN, conf, null); + Path file = new Path(topDir + "/testECLeaseRecover"); + client.mkdirs(topDir, null, false); + client.enableErasureCodingPolicy(ecPolicy.getName()); + client.setErasureCodingPolicy(topDir, ecPolicy.getName()); + OutputStream stm = client.create(file.toString(), true); + + // write 5MB File + AppendTestUtil.write(stm, 0, 1024 * 1024 * 5); + final AtomicReference err = new AtomicReference<>(); + Thread t = new Thread(() -> { + try { + stm.close(); + } catch (Throwable t1) { + err.set(t1); + } + }); + t.start(); + + // Waiting for close to get to latch + delayer.waitForCall(); + GenericTestUtils.waitFor(() -> { + try { + return client.getNamenode().recoverLease(file.toString(), + client.getClientName()); + } catch (IOException e) { + return false; + } + }, 5000, 24000); + delayer.proceed(); + } finally { + cluster.shutdown(); + } + } + + /** + * Test that block will be recovered even if there are less than the + * specified minReplication datanodes involved in its recovery. + * + * Check that, after recovering, the block will be successfully replicated. + */ + @Test(timeout = 300000L) + public void testRecoveryWillIgnoreMinReplication() throws Exception { + tearDown(); // Stop the Mocked DN started in startup() + + final int blockSize = 4096; + final int numReplicas = 3; + final String filename = "/testIgnoreMinReplication"; + final Path filePath = new Path(filename); + Configuration configuration = new HdfsConfiguration(); + configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); + configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2); + configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize); + MiniDFSCluster cluster = null; + + try { + cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5) + .build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final FSNamesystem fsn = cluster.getNamesystem(); + + // Create a file and never close the output stream to trigger recovery + FSDataOutputStream out = dfs.create(filePath, (short) numReplicas); + out.write(AppendTestUtil.randomBytes(0, blockSize)); + out.hsync(); + + DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), configuration); + LocatedBlock blk = dfsClient.getNamenode(). + getBlockLocations(filename, 0, blockSize). + getLastLocatedBlock(); + + // Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication + List dataNodes = Arrays.asList(blk.getLocations()); + assertEquals(dataNodes.size(), numReplicas); + for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) { + cluster.stopDataNode(dataNode.getName()); + } + + GenericTestUtils.waitFor(() -> fsn.getNumDeadDataNodes() == 2, + 300, 300000); + + // Make sure hard lease expires to trigger replica recovery + cluster.setLeasePeriod(100L, 100L); + + // Wait for recovery to succeed + GenericTestUtils.waitFor(() -> { + try { + return dfs.isFileClosed(filePath); + } catch (IOException e) { + LOG.info("Something went wrong.", e); + } + return false; + }, 300, 300000); + + // Wait for the block to be replicated + DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock( + dfs, filePath), 1, numReplicas, 0); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index ff3b3eabc3b..8cbd38bc601 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -28,8 +28,6 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -48,8 +46,6 @@ import org.junit.Test; */ public class TestDataNodeReconfiguration { - private static final Logger LOG = - LoggerFactory.getLogger(TestBlockRecovery.class); private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() + "data"; private final static InetSocketAddress NN_ADDR = new InetSocketAddress(