HDFS-3256. HDFS considers blocks under-replicated if topology script is configured with only 1 rack. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1325531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-04-12 21:28:44 +00:00
parent 07a4367445
commit 4f230adc13
5 changed files with 80 additions and 5 deletions

View File

@ -506,6 +506,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3255. HA DFS returns wrong token service (Daryn Sharp via todd)
HDFS-3256. HDFS considers blocks under-replicated if topology script is
configured with only 1 rack. (atm)
BREAKDOWN OF HDFS-1623 SUBTASKS
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

View File

@ -247,8 +247,7 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
: true;
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
@ -2829,7 +2828,9 @@ boolean blockHasEnoughRacks(Block b) {
DatanodeDescriptor cur = it.next();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
if (numExpectedReplicas == 1) {
if (numExpectedReplicas == 1 ||
(numExpectedReplicas > 1 &&
!datanodeManager.hasClusterEverBeenMultiRack())) {
enoughRacks = true;
break;
}

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.InetAddresses;
/**
@ -126,6 +127,12 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
DatanodeManager(final BlockManager blockManager,
final Namesystem namesystem, final Configuration conf
) throws IOException {
@ -331,6 +338,7 @@ private void addDatanode(final DatanodeDescriptor node) {
host2DatanodeMap.add(node);
networktopology.add(node);
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
@ -768,6 +776,42 @@ public void fetchDatanodes(final List<DatanodeDescriptor> live,
}
}
/**
* @return true if this cluster has ever consisted of multiple racks, even if
* it is not now a multi-rack cluster.
*/
boolean hasClusterEverBeenMultiRack() {
return hasClusterEverBeenMultiRack;
}
/**
* Check if the cluster now consists of multiple racks. If it does, and this
* is the first time it's consisted of multiple racks, then process blocks
* that may now be misreplicated.
*
* @param node DN which caused cluster to become multi-rack. Used for logging.
*/
@VisibleForTesting
void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
String message = "DN " + node + " joining cluster has expanded a formerly " +
"single-rack cluster to be multi-rack. ";
if (namesystem.isPopulatingReplQueues()) {
message += "Re-checking all blocks for replication, since they should " +
"now be replicated cross-rack";
LOG.info(message);
} else {
message += "Not checking for mis-replicated blocks because this NN is " +
"not yet processing repl queues.";
LOG.debug(message);
}
hasClusterEverBeenMultiRack = true;
if (namesystem.isPopulatingReplQueues()) {
blockManager.processMisReplicatedBlocks();
}
}
}
/**
* Parse a DatanodeID from a hosts file entry
* @param hostLine of form [hostname|ip][:port]?

View File

@ -92,6 +92,7 @@ private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {
dn.updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
}
}
@ -310,6 +311,32 @@ private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) {
rackB.contains(pipeline[1]));
}
@Test
public void testBlocksAreNotUnderreplicatedInSingleRack() throws Exception {
List<DatanodeDescriptor> nodes = ImmutableList.of(
new DatanodeDescriptor(new DatanodeID("h1", 5020), "/rackA"),
new DatanodeDescriptor(new DatanodeID("h2", 5020), "/rackA"),
new DatanodeDescriptor(new DatanodeID("h3", 5020), "/rackA"),
new DatanodeDescriptor(new DatanodeID("h4", 5020), "/rackA"),
new DatanodeDescriptor(new DatanodeID("h5", 5020), "/rackA"),
new DatanodeDescriptor(new DatanodeID("h6", 5020), "/rackA")
);
addNodes(nodes);
List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);;
for (int i = 0; i < NUM_TEST_ITERS; i++) {
doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
}
}
private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
List<DatanodeDescriptor> origNodes)
throws Exception {
assertEquals(0, bm.numOfUnderReplicatedBlocks());
addBlockOnNodes((long)testIndex, origNodes);
bm.processMisReplicatedBlocks();
assertEquals(0, bm.numOfUnderReplicatedBlocks());
}
/**
* Tell the block manager that replication is completed for the given

View File

@ -97,7 +97,7 @@ public void testSufficientlyReplBlocksUsesNewRack() throws Exception {
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
// Add a new datanode on a different rack
String newRacks[] = {"/rack2"};
@ -165,7 +165,7 @@ public void testUnderReplicatedUsesNewRacks() throws Exception {
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
// Add new datanodes on a different rack and increase the
// replication factor so the block is underreplicated and make