HDFS-3256. HDFS considers blocks under-replicated if topology script is configured with only 1 rack. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1325532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c94cfd8179
commit
e3bef05fcc
|
@ -386,6 +386,9 @@ Release 2.0.0 - UNRELEASED
|
|||
|
||||
HDFS-3255. HA DFS returns wrong token service (Daryn Sharp via todd)
|
||||
|
||||
HDFS-3256. HDFS considers blocks under-replicated if topology script is
|
||||
configured with only 1 rack. (atm)
|
||||
|
||||
BREAKDOWN OF HDFS-1623 SUBTASKS
|
||||
|
||||
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
||||
|
|
|
@ -249,8 +249,7 @@ public class BlockManager {
|
|||
|
||||
this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
|
||||
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
|
||||
: true;
|
||||
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
|
||||
|
||||
this.replicationRecheckInterval =
|
||||
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
|
||||
|
@ -2831,7 +2830,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
DatanodeDescriptor cur = it.next();
|
||||
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
||||
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
||||
if (numExpectedReplicas == 1) {
|
||||
if (numExpectedReplicas == 1 ||
|
||||
(numExpectedReplicas > 1 &&
|
||||
!datanodeManager.hasClusterEverBeenMultiRack())) {
|
||||
enoughRacks = true;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.hadoop.util.Daemon;
|
|||
import org.apache.hadoop.util.HostsFileReader;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.net.InetAddresses;
|
||||
|
||||
/**
|
||||
|
@ -126,6 +127,12 @@ public class DatanodeManager {
|
|||
/** Ask Datanode only up to this many blocks to delete. */
|
||||
final int blockInvalidateLimit;
|
||||
|
||||
/**
|
||||
* Whether or not this cluster has ever consisted of more than 1 rack,
|
||||
* according to the NetworkTopology.
|
||||
*/
|
||||
private boolean hasClusterEverBeenMultiRack = false;
|
||||
|
||||
DatanodeManager(final BlockManager blockManager,
|
||||
final Namesystem namesystem, final Configuration conf
|
||||
) throws IOException {
|
||||
|
@ -331,6 +338,7 @@ public class DatanodeManager {
|
|||
|
||||
host2DatanodeMap.add(node);
|
||||
networktopology.add(node);
|
||||
checkIfClusterIsNowMultiRack(node);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
|
||||
|
@ -768,6 +776,42 @@ public class DatanodeManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if this cluster has ever consisted of multiple racks, even if
|
||||
* it is not now a multi-rack cluster.
|
||||
*/
|
||||
boolean hasClusterEverBeenMultiRack() {
|
||||
return hasClusterEverBeenMultiRack;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the cluster now consists of multiple racks. If it does, and this
|
||||
* is the first time it's consisted of multiple racks, then process blocks
|
||||
* that may now be misreplicated.
|
||||
*
|
||||
* @param node DN which caused cluster to become multi-rack. Used for logging.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
|
||||
if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
|
||||
String message = "DN " + node + " joining cluster has expanded a formerly " +
|
||||
"single-rack cluster to be multi-rack. ";
|
||||
if (namesystem.isPopulatingReplQueues()) {
|
||||
message += "Re-checking all blocks for replication, since they should " +
|
||||
"now be replicated cross-rack";
|
||||
LOG.info(message);
|
||||
} else {
|
||||
message += "Not checking for mis-replicated blocks because this NN is " +
|
||||
"not yet processing repl queues.";
|
||||
LOG.debug(message);
|
||||
}
|
||||
hasClusterEverBeenMultiRack = true;
|
||||
if (namesystem.isPopulatingReplQueues()) {
|
||||
blockManager.processMisReplicatedBlocks();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a DatanodeID from a hosts file entry
|
||||
* @param hostLine of form [hostname|ip][:port]?
|
||||
|
|
|
@ -92,6 +92,7 @@ public class TestBlockManager {
|
|||
dn.updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -310,6 +311,32 @@ public class TestBlockManager {
|
|||
rackB.contains(pipeline[1]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlocksAreNotUnderreplicatedInSingleRack() throws Exception {
|
||||
List<DatanodeDescriptor> nodes = ImmutableList.of(
|
||||
new DatanodeDescriptor(new DatanodeID("h1", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h4", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h5", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h6", 5020), "/rackA")
|
||||
);
|
||||
addNodes(nodes);
|
||||
List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);;
|
||||
for (int i = 0; i < NUM_TEST_ITERS; i++) {
|
||||
doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
|
||||
}
|
||||
}
|
||||
|
||||
private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
|
||||
List<DatanodeDescriptor> origNodes)
|
||||
throws Exception {
|
||||
assertEquals(0, bm.numOfUnderReplicatedBlocks());
|
||||
addBlockOnNodes((long)testIndex, origNodes);
|
||||
bm.processMisReplicatedBlocks();
|
||||
assertEquals(0, bm.numOfUnderReplicatedBlocks());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tell the block manager that replication is completed for the given
|
||||
|
|
|
@ -97,7 +97,7 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
final FileSystem fs = cluster.getFileSystem();
|
||||
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
|
||||
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
|
||||
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
|
||||
|
||||
// Add a new datanode on a different rack
|
||||
String newRacks[] = {"/rack2"};
|
||||
|
@ -165,7 +165,7 @@ public class TestBlocksWithNotEnoughRacks {
|
|||
final FileSystem fs = cluster.getFileSystem();
|
||||
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
|
||||
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
|
||||
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
|
||||
|
||||
// Add new datanodes on a different rack and increase the
|
||||
// replication factor so the block is underreplicated and make
|
||||
|
|
Loading…
Reference in New Issue