HDFS-12886. Ignore minReplication for block recovery. Contributed by Lukas Majercak.
(cherry picked from commit 08ff1586d5
)
This commit is contained in:
parent
7c286fe804
commit
c6b1125a24
|
@ -994,6 +994,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
addExpectedReplicasToPending(lastBlock);
|
||||
}
|
||||
completeBlock(lastBlock, iip, false);
|
||||
} else if (pendingRecoveryBlocks.isUnderRecovery(lastBlock)) {
|
||||
// We've just finished recovery for this block, complete
|
||||
// the block forcibly disregarding number of replicas.
|
||||
// This is to ignore minReplication, the block will be closed
|
||||
// and then replicated out.
|
||||
completeBlock(lastBlock, iip, true);
|
||||
updateNeededReconstructions(lastBlock, 1, 0);
|
||||
}
|
||||
return committed;
|
||||
}
|
||||
|
|
|
@ -19,9 +19,14 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -42,6 +47,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -1142,4 +1148,81 @@ public class TestBlockRecovery {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that block will be recovered even if there are less than the
|
||||
* specified minReplication datanodes involved in its recovery.
|
||||
*
|
||||
* Check that, after recovering, the block will be successfully replicated.
|
||||
*/
|
||||
@Test(timeout = 300000L)
|
||||
public void testRecoveryWillIgnoreMinReplication() throws Exception {
|
||||
tearDown(); // Stop the Mocked DN started in startup()
|
||||
|
||||
final int blockSize = 4096;
|
||||
final int numReplicas = 3;
|
||||
final String filename = "/testIgnoreMinReplication";
|
||||
final Path filePath = new Path(filename);
|
||||
Configuration configuration = new HdfsConfiguration();
|
||||
configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
|
||||
configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
|
||||
configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final FSNamesystem fsn = cluster.getNamesystem();
|
||||
|
||||
// Create a file and never close the output stream to trigger recovery
|
||||
FSDataOutputStream out = dfs.create(filePath, (short) numReplicas);
|
||||
out.write(AppendTestUtil.randomBytes(0, blockSize));
|
||||
out.hsync();
|
||||
|
||||
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
|
||||
cluster.getNameNodePort()), configuration);
|
||||
LocatedBlock blk = dfsClient.getNamenode().
|
||||
getBlockLocations(filename, 0, blockSize).
|
||||
getLastLocatedBlock();
|
||||
|
||||
// Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication
|
||||
List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations());
|
||||
assertEquals(dataNodes.size(), numReplicas);
|
||||
for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) {
|
||||
cluster.stopDataNode(dataNode.getName());
|
||||
}
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return fsn.getNumDeadDataNodes() == 2;
|
||||
}
|
||||
}, 300, 300000);
|
||||
|
||||
// Make sure hard lease expires to trigger replica recovery
|
||||
cluster.setLeasePeriod(100L, 100L);
|
||||
|
||||
// Wait for recovery to succeed
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
try {
|
||||
return dfs.isFileClosed(filePath);
|
||||
} catch (IOException e) {}
|
||||
return false;
|
||||
}
|
||||
}, 300, 300000);
|
||||
|
||||
// Wait for the block to be replicated
|
||||
DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock(
|
||||
dfs, filePath), 1, numReplicas, 0);
|
||||
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue