HDFS-14637. Namenode may not replicate blocks to meet the policy after enabling upgradeDomain. Contributed by Stephen O'Donnell.

Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Stephen O'Donnell 2019-10-03 22:12:27 -07:00 committed by Wei-Chiu Chuang
parent 844b766da5
commit c99a12167f
11 changed files with 362 additions and 35 deletions

View File

@ -2009,6 +2009,7 @@ public class BlockManager implements BlockStatsMXBean {
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
}
@VisibleForTesting
BlockReconstructionWork scheduleReconstruction(BlockInfo block,
int priority) {
// skip abandoned block or block reopened for append
@ -2053,7 +2054,9 @@ public class BlockManager implements BlockStatsMXBean {
additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas()
- pendingNum;
} else {
additionalReplRequired = 1; // Needed on a new rack
// Violates placement policy. Needed on a new rack or domain etc.
BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
additionalReplRequired = placementStatus.getAdditionalReplicasRequired();
}
final BlockCollection bc = getBlockCollection(block);
@ -2086,20 +2089,6 @@ public class BlockManager implements BlockStatsMXBean {
}
}
private boolean isInNewRack(DatanodeDescriptor[] srcs,
DatanodeDescriptor target) {
LOG.debug("check if target {} increases racks, srcs={}", target,
Arrays.asList(srcs));
for (DatanodeDescriptor src : srcs) {
if (!src.isDecommissionInProgress() &&
src.getNetworkLocation().equals(target.getNetworkLocation())) {
LOG.debug("the target {} is in the same rack with src {}", target, src);
return false;
}
}
return true;
}
private boolean validateReconstructionWork(BlockReconstructionWork rw) {
BlockInfo block = rw.getBlock();
int priority = rw.getPriority();
@ -2125,10 +2114,16 @@ public class BlockManager implements BlockStatsMXBean {
}
DatanodeStorageInfo[] targets = rw.getTargets();
BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
if ((numReplicas.liveReplicas() >= requiredRedundancy) &&
(!isPlacementPolicySatisfied(block)) ) {
if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) {
// No use continuing, unless a new rack in this case
(!placementStatus.isPlacementPolicySatisfied())) {
BlockPlacementStatus newPlacementStatus =
getBlockPlacementStatus(block, targets);
if (!newPlacementStatus.isPlacementPolicySatisfied() &&
(newPlacementStatus.getAdditionalReplicasRequired() >=
placementStatus.getAdditionalReplicasRequired())) {
// If the new targets do not meet the placement policy, or at least
// reduce the number of replicas needed, then no use continuing.
return false;
}
// mark that the reconstruction work is to replicate internal block to a
@ -4562,7 +4557,25 @@ public class BlockManager implements BlockStatsMXBean {
}
boolean isPlacementPolicySatisfied(BlockInfo storedBlock) {
return getBlockPlacementStatus(storedBlock, null)
.isPlacementPolicySatisfied();
}
BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock) {
return getBlockPlacementStatus(storedBlock, null);
}
BlockPlacementStatus getBlockPlacementStatus(BlockInfo storedBlock,
DatanodeStorageInfo[] additionalStorage) {
List<DatanodeDescriptor> liveNodes = new ArrayList<>();
if (additionalStorage != null) {
// additionalNodes, are potential new targets for the block. If there are
// any passed, include them when checking the placement policy to see if
// the policy is met, when it may not have been met without these nodes.
for (DatanodeStorageInfo s : additionalStorage) {
liveNodes.add(getDatanodeDescriptorFromStorage(s));
}
}
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(storedBlock);
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
@ -4570,8 +4583,23 @@ public class BlockManager implements BlockStatsMXBean {
&& storage.getState() == State.NORMAL) {
// assume the policy is satisfied for blocks on PROVIDED storage
// as long as the storage is in normal state.
return new BlockPlacementStatus() {
@Override
public boolean isPlacementPolicySatisfied() {
return true;
}
@Override
public String getErrorDescription() {
return null;
}
@Override
public int getAdditionalReplicasRequired() {
return 0;
}
};
}
final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage);
// Nodes under maintenance should be counted as valid replicas from
// rack policy point of view.
@ -4586,8 +4614,7 @@ public class BlockManager implements BlockStatsMXBean {
.getPolicy(blockType);
int numReplicas = blockType == STRIPED ? ((BlockInfoStriped) storedBlock)
.getRealTotalBlockNum() : storedBlock.getReplication();
return placementPolicy.verifyBlockPlacement(locs, numReplicas)
.isPlacementPolicySatisfied();
return placementPolicy.verifyBlockPlacement(locs, numReplicas);
}
boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock,

View File

@ -39,4 +39,12 @@ public interface BlockPlacementStatus {
*/
public String getErrorDescription();
/**
* Return the number of additional replicas needed to ensure the block
* placement policy is satisfied.
* @return The number of new replicas needed to satisify the placement policy
* or zero if no extra are needed
*/
int getAdditionalReplicasRequired();
}

View File

@ -45,4 +45,12 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
" more rack(s). Total number of racks in the cluster: " + totalRacks;
}
@Override
public int getAdditionalReplicasRequired() {
if (isPlacementPolicySatisfied()) {
return 0;
} else {
return requiredRacks - currentRacks;
}
}
}

View File

@ -78,4 +78,15 @@ public class BlockPlacementStatusWithNodeGroup implements BlockPlacementStatus {
}
return errorDescription.toString();
}
@Override
public int getAdditionalReplicasRequired() {
if (isPlacementPolicySatisfied()) {
return 0;
} else {
int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired();
int child = requiredNodeGroups - currentNodeGroups.size();
return Math.max(parent, child);
}
}
}

View File

@ -85,4 +85,24 @@ public class BlockPlacementStatusWithUpgradeDomain implements
}
return errorDescription.toString();
}
@Override
public int getAdditionalReplicasRequired() {
if (isPlacementPolicySatisfied()) {
return 0;
} else {
// It is possible for a block to have the correct number of upgrade
// domains, but only a single rack, or be on multiple racks, but only in
// one upgrade domain.
int parent = parentBlockPlacementStatus.getAdditionalReplicasRequired();
int child;
if (numberOfReplicas <= upgradeDomainFactor) {
child = numberOfReplicas - upgradeDomains.size();
} else {
child = upgradeDomainFactor - upgradeDomains.size();
}
return Math.max(parent, child);
}
}
}

View File

@ -556,17 +556,24 @@ public class DFSTestUtil {
}
}
public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
int racks, int replicas, int neededReplicas)
throws TimeoutException, InterruptedException {
waitForReplication(cluster, b, racks, replicas, neededReplicas, 0);
}
/*
* Wait up to 20s for the given block to be replicated across
* the requested number of racks, with the requested number of
* replicas, and the requested number of replicas still needed.
*/
public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
int racks, int replicas, int neededReplicas)
int racks, int replicas, int neededReplicas, int neededDomains)
throws TimeoutException, InterruptedException {
int curRacks = 0;
int curReplicas = 0;
int curNeededReplicas = 0;
int curDomains = 0;
int count = 0;
final int ATTEMPTS = 20;
@ -577,17 +584,21 @@ public class DFSTestUtil {
curRacks = r[0];
curReplicas = r[1];
curNeededReplicas = r[2];
curDomains = r[3];
count++;
} while ((curRacks != racks ||
curReplicas != replicas ||
curNeededReplicas != neededReplicas) && count < ATTEMPTS);
curNeededReplicas != neededReplicas ||
(neededDomains != 0 && curDomains != neededDomains))
&& count < ATTEMPTS);
if (count == ATTEMPTS) {
throw new TimeoutException("Timed out waiting for replication."
+ " Needed replicas = "+neededReplicas
+ " Cur needed replicas = "+curNeededReplicas
+ " Replicas = "+replicas+" Cur replicas = "+curReplicas
+ " Racks = "+racks+" Cur racks = "+curRacks);
+ " Racks = "+racks+" Cur racks = "+curRacks
+ " Domains = "+neededDomains+" Cur domains = "+curDomains);
}
}

View File

@ -81,7 +81,8 @@ public class BlockManagerTestUtil {
/**
* @return a tuple of the replica state (number racks, number live
* replicas, and number needed replicas) for the given block.
* replicas, number needed replicas and number of UpgradeDomains) for the
* given block.
*/
public static int[] getReplicaInfo(final FSNamesystem namesystem, final Block b) {
final BlockManager bm = namesystem.getBlockManager();
@ -90,7 +91,8 @@ public class BlockManagerTestUtil {
final BlockInfo storedBlock = bm.getStoredBlock(b);
return new int[]{getNumberOfRacks(bm, b),
bm.countNodes(storedBlock).liveReplicas(),
bm.neededReconstruction.contains(storedBlock) ? 1 : 0};
bm.neededReconstruction.contains(storedBlock) ? 1 : 0,
getNumberOfDomains(bm, b)};
} finally {
namesystem.readUnlock();
}
@ -120,6 +122,30 @@ public class BlockManagerTestUtil {
return rackSet.size();
}
/**
* @return the number of UpgradeDomains over which a given block is replicated
* decommissioning/decommissioned nodes are not counted. corrupt replicas
* are also ignored.
*/
private static int getNumberOfDomains(final BlockManager blockManager,
final Block b) {
final Set<String> domSet = new HashSet<String>(0);
final Collection<DatanodeDescriptor> corruptNodes =
getCorruptReplicas(blockManager).getNodes(b);
for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null) || !corruptNodes.contains(cur)) {
String domain = cur.getUpgradeDomain();
if (domain != null && !domSet.contains(domain)) {
domSet.add(domain);
}
}
}
}
return domSet.size();
}
/**
* @return redundancy monitor thread instance from block manager.
*/

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import org.junit.Test;
/**
* Unit tests to validate the BlockPlacementStatusDefault policy, focusing on
* the getAdditionAlReplicasRequired method.
*/
public class TestBlockPlacementStatusDefault {
@Test
public void testIsPolicySatisfiedCorrectly() {
// 2 current racks and 2 expected
BlockPlacementStatusDefault bps =
new BlockPlacementStatusDefault(2, 2, 5);
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
// 1 current rack and 2 expected
bps =
new BlockPlacementStatusDefault(1, 2, 5);
assertFalse(bps.isPlacementPolicySatisfied());
assertEquals(1, bps.getAdditionalReplicasRequired());
// 3 current racks and 2 expected
bps =
new BlockPlacementStatusDefault(3, 2, 5);
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
// 1 current rack and 2 expected, but only 1 rack on the cluster
bps =
new BlockPlacementStatusDefault(1, 2, 1);
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -49,11 +50,13 @@ public class TestBlockPlacementStatusWithUpgradeDomain {
@Test
public void testIsPolicySatisfiedParentFalse() {
when(bpsd.isPlacementPolicySatisfied()).thenReturn(false);
when(bpsd.getAdditionalReplicasRequired()).thenReturn(1);
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
// Parent policy is not satisfied but upgrade domain policy is
assertFalse(bps.isPlacementPolicySatisfied());
assertEquals(1, bps.getAdditionalReplicasRequired());
}
@Test
@ -63,21 +66,73 @@ public class TestBlockPlacementStatusWithUpgradeDomain {
// Number of domains, replicas and upgradeDomainFactor is equal and parent
// policy is satisfied
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
}
@Test
public void testIsPolicySatisifedSmallDomains() {
public void testIsPolicySatisfiedSmallDomains() {
// Number of domains is less than replicas but equal to factor
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 3);
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
// Same as above but replicas is greater than factor
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 2);
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
// Number of domains is less than replicas and factor
bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 4);
assertFalse(bps.isPlacementPolicySatisfied());
assertEquals(1, bps.getAdditionalReplicasRequired());
}
@Test
public void testIsPolicySatisfiedSmallReplicas() {
// Replication factor 1 file
upgradeDomains.clear();
upgradeDomains.add("1");
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 1, 3);
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
// Replication factor 2 file, but one domain
bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
assertFalse(bps.isPlacementPolicySatisfied());
assertEquals(1, bps.getAdditionalReplicasRequired());
// Replication factor 2 file, but two domains
upgradeDomains.add("2");
bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
assertTrue(bps.isPlacementPolicySatisfied());
assertEquals(0, bps.getAdditionalReplicasRequired());
}
@Test
public void testPolicyIsNotSatisfiedInsufficientDomains() {
// Insufficient Domains - 1 domain, replication factor 3
upgradeDomains.clear();
upgradeDomains.add("1");
BlockPlacementStatusWithUpgradeDomain bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
assertFalse(bps.isPlacementPolicySatisfied());
assertEquals(2, bps.getAdditionalReplicasRequired());
// One domain, replication factor 2 file
bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 2, 3);
assertFalse(bps.isPlacementPolicySatisfied());
assertEquals(1, bps.getAdditionalReplicasRequired());
// 2 domains, replication factor 3
upgradeDomains.add("2");
bps =
new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3);
assertFalse(bps.isPlacementPolicySatisfied());
assertEquals(1, bps.getAdditionalReplicasRequired());
}
}

View File

@ -18,14 +18,10 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.slf4j.Logger;
@ -48,6 +44,8 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.slf4j.event.Level;
import static org.junit.Assert.*;
public class TestBlocksWithNotEnoughRacks {
public static final Logger LOG =
LoggerFactory.getLogger(TestBlocksWithNotEnoughRacks.class);
@ -545,4 +543,105 @@ public class TestBlocksWithNotEnoughRacks {
hostsFileWriter.cleanup();
}
}
@Test
public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception {
Configuration conf = getConf();
final short replicationFactor = 3;
final Path filePath = new Path("/testFile");
conf.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement." +
"BlockPlacementPolicyWithUpgradeDomain");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(6).build();
cluster.waitClusterUp();
List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
try {
// Create a file with one block with a replication factor of 3
// No upgrade domains are set.
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
BlockManager bm = cluster.getNamesystem().getBlockManager();
BlockInfo storedBlock = bm.getStoredBlock(b.getLocalBlock());
// The block should be replicated OK - so Reconstruction Work will be null
BlockReconstructionWork work = bm.scheduleReconstruction(storedBlock, 2);
assertNull(work);
// Set the upgradeDomain to "3" for the 3 nodes hosting the block.
// Then alternately set the remaining 3 nodes to have an upgradeDomain
// of 0 or 1 giving a total of 3 upgradeDomains.
for (int i=0; i<storedBlock.getReplication(); i++) {
storedBlock.getDatanode(i).setUpgradeDomain("3");
}
int udInd = 0;
for (DatanodeDescriptor d : dnDescriptors) {
if (d.getUpgradeDomain() == null) {
d.setUpgradeDomain(Integer.toString(udInd % 2));
udInd++;
}
}
// Now reconWork is non-null and 2 extra targets are needed
work = bm.scheduleReconstruction(storedBlock, 2);
assertEquals(2, work.getAdditionalReplRequired());
// Add the block to the replication queue and ensure it is replicated
// correctly.
bm.neededReconstruction.add(storedBlock, 3, 0, 0, replicationFactor);
DFSTestUtil.waitForReplication(cluster, b, 1, replicationFactor, 0, 3);
} finally {
cluster.shutdown();
}
}
@Test
public void testUnderReplicatedRespectsRacksAndUpgradeDomain()
throws Exception {
Configuration conf = getConf();
final short replicationFactor = 3;
final Path filePath = new Path("/testFile");
conf.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement." +
"BlockPlacementPolicyWithUpgradeDomain");
// All hosts are on two racks
String[] racks = {"/r1", "/r1", "/r1", "/r2", "/r2", "/r2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(6).racks(racks).build();
cluster.waitClusterUp();
List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
for (int i=0; i < dnDescriptors.size(); i++) {
dnDescriptors.get(i).setUpgradeDomain(Integer.toString(i%3));
}
try {
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
fs.setReplication(filePath, replicationFactor);
DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0, 3);
} finally {
cluster.shutdown();
}
}
private List<DatanodeDescriptor> getDnDescriptors(MiniDFSCluster cluster)
throws IOException {
List<DatanodeDescriptor> dnDesc = new ArrayList<>();
DatanodeManager dnManager = cluster.getNamesystem().getBlockManager()
.getDatanodeManager();
for (DataNode dn : cluster.getDataNodes()) {
DatanodeDescriptor d = dnManager.getDatanode(dn.getDatanodeUuid());
if (d == null) {
throw new IOException("DatanodeDescriptor not found for DN "+
dn.getDatanodeUuid());
}
dnDesc.add(d);
}
return dnDesc;
}
}

View File

@ -42,6 +42,11 @@ public class BlockPlacementPolicyAlwaysSatisfied
public String getErrorDescription() {
return null;
}
@Override
public int getAdditionalReplicasRequired() {
return 0;
}
}
@Override