HDFS-12725. BlockPlacementPolicyRackFaultTolerant fails with very uneven racks.
This commit is contained in:
parent
6fc09beac4
commit
b00f828d84
|
@ -62,10 +62,17 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
|||
* randomly.
|
||||
* 2. If total replica expected is bigger than numOfRacks, it choose:
|
||||
* 2a. Fill each rack exactly (maxNodesPerRack-1) replicas.
|
||||
* 2b. For some random racks, place one more replica to each one of them, until
|
||||
* numOfReplicas have been chosen. <br>
|
||||
* In the end, the difference of the numbers of replicas for each two racks
|
||||
* is no more than 1.
|
||||
* 2b. For some random racks, place one more replica to each one of them,
|
||||
* until numOfReplicas have been chosen. <br>
|
||||
* 3. If after step 2, there are still replicas not placed (due to some
|
||||
* racks have fewer datanodes than maxNodesPerRack), the rest of the replicas
|
||||
* is placed evenly on the rest of the racks who have Datanodes that have
|
||||
* not been placed a replica.
|
||||
* 4. If after step 3, there are still replicas not placed. A
|
||||
* {@link NotEnoughReplicasException} is thrown.
|
||||
* <p>
|
||||
* For normal setups, step 2 would suffice. So in the end, the difference
|
||||
* of the numbers of replicas for each two racks is no more than 1.
|
||||
* Either way it always prefer local storage.
|
||||
* @return local node of writer
|
||||
*/
|
||||
|
@ -132,24 +139,63 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
|||
chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
|
||||
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
||||
} catch (NotEnoughReplicasException e) {
|
||||
LOG.debug("Only able to place {} of {} (maxNodesPerRack={}) nodes " +
|
||||
"evenly across racks, falling back to uneven placement.",
|
||||
results.size(), numOfReplicas, maxNodesPerRack);
|
||||
LOG.warn("Only able to place {} of total expected {}"
|
||||
+ " (maxNodesPerRack={}, numOfReplicas={}) nodes "
|
||||
+ "evenly across racks, falling back to evenly place on the "
|
||||
+ "remaining racks. This may not guarantee rack-level fault "
|
||||
+ "tolerance. Please check if the racks are configured properly.",
|
||||
results.size(), totalReplicaExpected, maxNodesPerRack, numOfReplicas);
|
||||
LOG.debug("Caught exception was:", e);
|
||||
chooseEvenlyFromRemainingRacks(writer, excludedNodes, blocksize,
|
||||
maxNodesPerRack, results, avoidStaleNodes, storageTypes,
|
||||
totalReplicaExpected, e);
|
||||
|
||||
}
|
||||
|
||||
return writer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Choose as evenly as possible from the racks which have available datanodes.
|
||||
*/
|
||||
private void chooseEvenlyFromRemainingRacks(Node writer,
|
||||
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
|
||||
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
|
||||
EnumMap<StorageType, Integer> storageTypes, int totalReplicaExpected,
|
||||
NotEnoughReplicasException e) throws NotEnoughReplicasException {
|
||||
int numResultsOflastChoose = 0;
|
||||
NotEnoughReplicasException lastException = e;
|
||||
int bestEffortMaxNodesPerRack = maxNodesPerRack;
|
||||
while (results.size() != totalReplicaExpected &&
|
||||
numResultsOflastChoose != results.size()) {
|
||||
// Exclude the chosen nodes
|
||||
final Set<Node> newExcludeNodes = new HashSet<>();
|
||||
for (DatanodeStorageInfo resultStorage : results) {
|
||||
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
|
||||
excludedNodes);
|
||||
newExcludeNodes);
|
||||
}
|
||||
|
||||
LOG.trace("Chosen nodes: {}", results);
|
||||
LOG.trace("Excluded nodes: {}", excludedNodes);
|
||||
numOfReplicas = totalReplicaExpected - results.size();
|
||||
chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
|
||||
totalReplicaExpected, results, avoidStaleNodes, storageTypes);
|
||||
LOG.trace("New Excluded nodes: {}", newExcludeNodes);
|
||||
final int numOfReplicas = totalReplicaExpected - results.size();
|
||||
numResultsOflastChoose = results.size();
|
||||
try {
|
||||
chooseOnce(numOfReplicas, writer, newExcludeNodes, blocksize,
|
||||
++bestEffortMaxNodesPerRack, results, avoidStaleNodes,
|
||||
storageTypes);
|
||||
} catch (NotEnoughReplicasException nere) {
|
||||
lastException = nere;
|
||||
} finally {
|
||||
excludedNodes.addAll(newExcludeNodes);
|
||||
}
|
||||
}
|
||||
|
||||
return writer;
|
||||
if (numResultsOflastChoose != totalReplicaExpected) {
|
||||
LOG.debug("Best effort placement failed: expecting {} replicas, only "
|
||||
+ "chose {}.", totalReplicaExpected, numResultsOflastChoose);
|
||||
throw lastException;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
|
@ -34,6 +35,13 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test erasure coding block placement with skewed # nodes per rack.
|
||||
*/
|
||||
|
@ -42,10 +50,10 @@ public class TestErasureCodingMultipleRacks {
|
|||
LoggerFactory.getLogger(TestErasureCodingMultipleRacks.class);
|
||||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.DEBUG);
|
||||
GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.DEBUG);
|
||||
GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.TRACE);
|
||||
GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.TRACE);
|
||||
GenericTestUtils.setLogLevel(BlockPlacementPolicyRackFaultTolerant.LOG,
|
||||
Level.DEBUG);
|
||||
Level.TRACE);
|
||||
GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.DEBUG);
|
||||
}
|
||||
|
||||
|
@ -62,20 +70,38 @@ public class TestErasureCodingMultipleRacks {
|
|||
private DistributedFileSystem dfs;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
public void setup() {
|
||||
ecPolicy = getPolicy();
|
||||
final int dataUnits = ecPolicy.getNumDataUnits();
|
||||
final int parityUnits = ecPolicy.getNumParityUnits();
|
||||
final int numDatanodes = dataUnits + parityUnits;
|
||||
final int numRacks = 2;
|
||||
conf = new HdfsConfiguration();
|
||||
// disable load consideration to test placement only.
|
||||
conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup cluster with desired number of DN, racks, and specified number of
|
||||
* rack that only has 1 DN. Other racks will be evenly setup with the number
|
||||
* of DNs.
|
||||
* <p>
|
||||
* This is not done as a {@link Before}, so test cases can setup differently.
|
||||
*
|
||||
* @param numDatanodes number of total Datanodes.
|
||||
* @param numRacks number of total racks
|
||||
* @param numSingleDnRacks number of racks that only has 1 DN
|
||||
* @throws Exception
|
||||
*/
|
||||
public void setupCluster(final int numDatanodes, final int numRacks,
|
||||
final int numSingleDnRacks) throws Exception {
|
||||
assert numDatanodes > numRacks;
|
||||
assert numRacks > numSingleDnRacks;
|
||||
assert numSingleDnRacks >= 0;
|
||||
final String[] racks = new String[numDatanodes];
|
||||
for (int i = 0; i < numRacks; i++) {
|
||||
for (int i = 0; i < numSingleDnRacks; i++) {
|
||||
racks[i] = "/rack" + i;
|
||||
}
|
||||
for (int i = numRacks; i < numDatanodes; i++) {
|
||||
racks[i] = "/rack" + (numRacks - 1);
|
||||
for (int i = numSingleDnRacks; i < numDatanodes; i++) {
|
||||
racks[i] =
|
||||
"/rack" + (numSingleDnRacks + (i % (numRacks - numSingleDnRacks)));
|
||||
}
|
||||
conf = new HdfsConfiguration();
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(numDatanodes)
|
||||
.racks(racks)
|
||||
|
@ -92,16 +118,89 @@ public class TestErasureCodingMultipleRacks {
|
|||
}
|
||||
}
|
||||
|
||||
// Extreme case.
|
||||
@Test
|
||||
public void testSkewedRack() throws Exception {
|
||||
final int filesize = ecPolicy.getNumDataUnits() * ecPolicy
|
||||
.getCellSize();
|
||||
public void testSkewedRack1() throws Exception {
|
||||
final int dataUnits = ecPolicy.getNumDataUnits();
|
||||
final int parityUnits = ecPolicy.getNumParityUnits();
|
||||
setupCluster(dataUnits + parityUnits, 2, 1);
|
||||
|
||||
final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
|
||||
byte[] contents = new byte[filesize];
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
final Path path = new Path("/testfile");
|
||||
LOG.info("Writing file " + path);
|
||||
DFSTestUtil.writeFile(dfs, path, contents);
|
||||
BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
|
||||
assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
|
||||
blocks[0].getHosts().length);
|
||||
}
|
||||
|
||||
// 1 rack has many nodes, other racks have single node. Extreme case.
|
||||
@Test
|
||||
public void testSkewedRack2() throws Exception {
|
||||
final int dataUnits = ecPolicy.getNumDataUnits();
|
||||
final int parityUnits = ecPolicy.getNumParityUnits();
|
||||
setupCluster(dataUnits + parityUnits * 2, dataUnits, dataUnits - 1);
|
||||
|
||||
final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
|
||||
byte[] contents = new byte[filesize];
|
||||
|
||||
final Path path = new Path("/testfile");
|
||||
LOG.info("Writing file " + path);
|
||||
DFSTestUtil.writeFile(dfs, path, contents);
|
||||
BlockLocation[] blocks = dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
|
||||
assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
|
||||
blocks[0].getHosts().length);
|
||||
}
|
||||
|
||||
// 2 racks have sufficient nodes, other racks has 1. Should be able to
|
||||
// tolerate 1 rack failure.
|
||||
@Test
|
||||
public void testSkewedRack3() throws Exception {
|
||||
final int dataUnits = ecPolicy.getNumDataUnits();
|
||||
final int parityUnits = ecPolicy.getNumParityUnits();
|
||||
// Create enough extra DNs on the 2 racks to test even placement.
|
||||
// Desired placement is parityUnits replicas on the 2 racks, and 1 replica
|
||||
// on the rest of the racks (which only have 1 DN)
|
||||
setupCluster(dataUnits + parityUnits * 4, dataUnits - parityUnits + 2,
|
||||
dataUnits - parityUnits);
|
||||
|
||||
final int filesize = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
|
||||
byte[] contents = new byte[filesize];
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
final Path path = new Path("/testfile" + i);
|
||||
LOG.info("Writing file " + path);
|
||||
DFSTestUtil.writeFile(dfs, path, contents);
|
||||
BlockLocation[] blocks =
|
||||
dfs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
|
||||
assertEquals(ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(),
|
||||
blocks[0].getHosts().length);
|
||||
assertRackFailureTolerated(blocks[0].getTopologyPaths());
|
||||
}
|
||||
}
|
||||
|
||||
// Verifies that no more than numParityUnits is placed on a rack.
|
||||
private void assertRackFailureTolerated(final String[] topologies) {
|
||||
final Map<String, Integer> racksCount = new HashMap<>();
|
||||
for (String t : topologies) {
|
||||
final Integer count = racksCount.get(getRackName(t));
|
||||
if (count == null) {
|
||||
racksCount.put(getRackName(t), 1);
|
||||
} else {
|
||||
racksCount.put(getRackName(t), count + 1);
|
||||
}
|
||||
}
|
||||
LOG.info("Rack count map is: {}", racksCount);
|
||||
|
||||
for (Integer count : racksCount.values()) {
|
||||
assertTrue(count <= ecPolicy.getNumParityUnits());
|
||||
}
|
||||
}
|
||||
|
||||
private String getRackName(final String topology) {
|
||||
assert topology.indexOf('/', 1) > 0;
|
||||
return topology.substring(0, topology.indexOf('/', 1));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue