From c6b1125a241b7d33a7fb5cfc6ddbe44dd1996b90 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 16 Mar 2018 10:29:19 -0700 Subject: [PATCH] HDFS-12886. Ignore minReplication for block recovery. Contributed by Lukas Majercak. (cherry picked from commit 08ff1586d5d3e39f546200f9e696f62ea4cf000d) --- .../server/blockmanagement/BlockManager.java | 7 ++ .../server/datanode/TestBlockRecovery.java | 83 +++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 757016b216a..9b8f74ce70a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -994,6 +994,13 @@ public class BlockManager implements BlockStatsMXBean { addExpectedReplicasToPending(lastBlock); } completeBlock(lastBlock, iip, false); + } else if (pendingRecoveryBlocks.isUnderRecovery(lastBlock)) { + // We've just finished recovery for this block, complete + // the block forcibly disregarding number of replicas. + // This is to ignore minReplication, the block will be closed + // and then replicated out. + completeBlock(lastBlock, iip, true); + updateNeededReconstructions(lastBlock, 1, 0); } return committed; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index e9bd7a8e031..07fd4ae58d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -19,9 +19,14 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.hadoop.hdfs.AppendTestUtil; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -42,6 +47,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -1142,4 +1148,81 @@ public class TestBlockRecovery { } } } + + /** + * Test that block will be recovered even if there are less than the + * specified minReplication datanodes involved in its recovery. + * + * Check that, after recovering, the block will be successfully replicated. + */ + @Test(timeout = 300000L) + public void testRecoveryWillIgnoreMinReplication() throws Exception { + tearDown(); // Stop the Mocked DN started in startup() + + final int blockSize = 4096; + final int numReplicas = 3; + final String filename = "/testIgnoreMinReplication"; + final Path filePath = new Path(filename); + Configuration configuration = new HdfsConfiguration(); + configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); + configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2); + configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize); + MiniDFSCluster cluster = null; + + try { + cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5) + .build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final FSNamesystem fsn = cluster.getNamesystem(); + + // Create a file and never close the output stream to trigger recovery + FSDataOutputStream out = dfs.create(filePath, (short) numReplicas); + out.write(AppendTestUtil.randomBytes(0, blockSize)); + out.hsync(); + + DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), configuration); + LocatedBlock blk = dfsClient.getNamenode(). + getBlockLocations(filename, 0, blockSize). + getLastLocatedBlock(); + + // Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication + List dataNodes = Arrays.asList(blk.getLocations()); + assertEquals(dataNodes.size(), numReplicas); + for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) { + cluster.stopDataNode(dataNode.getName()); + } + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return fsn.getNumDeadDataNodes() == 2; + } + }, 300, 300000); + + // Make sure hard lease expires to trigger replica recovery + cluster.setLeasePeriod(100L, 100L); + + // Wait for recovery to succeed + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return dfs.isFileClosed(filePath); + } catch (IOException e) {} + return false; + } + }, 300, 300000); + + // Wait for the block to be replicated + DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock( + dfs, filePath), 1, numReplicas, 0); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }